
    Mh                         d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZ  G d	 d
e          Ze G d dej                              ZdS )    N)gen)IOStream)app_log)	TCPServer)skipIfNonUnix)AsyncTestCase	ExpectLogbind_unused_portgen_test)Tuplec                   V    e Zd Zed             Zed             Zd Zed             ZdS )TCPServerTestc              #     K    G d dt                     }d x}}	 t                      \  }} |            }|                    |           t          t	          j                              }t          t          d          5  |                    d|f          V  |                    d          V  |	                                V  t          j        V  d d d            n# 1 swxY w Y   ||                                 ||                                 d S d S # ||                                 ||                                 w w xY w)Nc                   .    e Zd Zej        d             ZdS )FTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServerc              3      K   |                     t          d                    V  |                                 ddz   d S )N   hello   r   )
read_byteslencloseselfstreamaddresss      [/var/www/html/test/jupyter/venv/lib/python3.11/site-packages/tornado/test/tcpserver_test.pyhandle_streamzTTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServer.handle_stream   s@      ''H66666A    N__name__
__module____qualname__r   	coroutiner    r   r   
TestServerr      s/        ]  ]  r   r%   zException in callback	localhostr   )r   r
   
add_socketr   socketr	   r   connectwriteread_until_closer   momentstopr   )r   r%   serverclientsockports         r   $test_handle_stream_coroutine_loggingz2TCPServerTest.test_handle_stream_coroutine_logging   s     	 	 	 	 	 	 	 	 	)++JD$Z\\Fd###fmoo..F7$;<< ! !nnk4%899999ll8,,,,,--/////j   	! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !! "! !! "s1   A%D  AC!D !C%%D (C%)D /Ec              #     K    G d dt                     }t                      \  }} |            }|                    |           t          t	          j                              }|                    d|f          V  |                                V }|                     |d           |                                 |	                                 d S )Nc                       e Zd Zd ZdS )ETCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServerc                 \   K   |                     d           |                                 d S )N   data)r*   r   r   s      r   r   zSTCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServer.handle_stream3   s)      W%%%r   N)r    r!   r"   r   r$   r   r   r%   r5   2   s#            r   r%   r&   r7   )
r   r
   r'   r   r(   r)   r+   assertEqualr-   r   )r   r%   r0   r1   r.   r/   results          r   #test_handle_stream_native_coroutinez1TCPServerTest.test_handle_stream_native_coroutine.   s      	 	 	 	 	 	 	 	
 &''
d$&-//**nnk4011111..0000)))r   c                     t                      \  }}t                      }|                    |           |                                 |                                 d S N)r
   r   r'   r-   )r   r0   r1   r.   s       r   test_stop_twicezTCPServerTest.test_stop_twiceA   sL    %''
d$r   c              #   6  	
K    G 	fddt                     }t                      \  }} |            		                    |           d|f
d}d t          |          D             }g t          j        
fd            fd|D             V  |                     t                    dd	           	 t                    |k    r|                     d
           D ]}|	                                 d S # D ]}|	                                 w xY w)Nc                   4    e Zd Zej         fd            ZdS )7TCPServerTest.test_stop_in_callback.<locals>.TestServerc              3   `   K                                     |                                V  d S r<   )r-   r+   )r   r   r   r.   s      r   r   zETCPServerTest.test_stop_in_callback.<locals>.TestServer.handle_streamO   s2      --///////r   Nr   )r.   s   r   r%   r@   N   s:        ]0 0 0 0 ]0 0 0r   r%   r&   (   c                 N    g | ]"}t          t          j                              #S r$   )r   r(   ).0is     r   
<listcomp>z7TCPServerTest.test_stop_in_callback.<locals>.<listcomp>Y   s&    ???8FMOO,,???r   c              3      K   	 |                                V                      |            d S # t          $ r Y d S w xY wr<   )r)   appendOSError)cconnected_clientsserver_addrs    r   r)   z4TCPServerTest.test_stop_in_callback.<locals>.connect\   sd      ,ii,,,,, "((+++++    s   3 
A Ac                 &    g | ]} |          S r$   r$   )rD   rJ   r)   s     r   rF   z7TCPServerTest.test_stop_in_callback.<locals>.<listcomp>e   s!    +++awwqzz+++r   r   zall clients failed connectingzHat least one client should fail connecting for the test to be meaningful)
r   r
   r'   ranger   r#   assertGreaterr   skipTestr   )r   r%   r0   r1   NclientsrJ   r)   rK   r.   rL   s          @@@@r   test_stop_in_callbackz#TCPServerTest.test_stop_in_callbackH   s     	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 &''
d$"D)??eAhh???		, 	, 	, 	, 	, 
	, ,+++7++++++301116UVVV
	$%%** 4  
 '  				 &  				s   :(C= =DN)r    r!   r"   r   r2   r:   r=   rS   r$   r   r   r   r      sr          X6   X$   ) ) X) ) )r   r   c                   F    e Zd Zdedeeef         fdZd Zd Zd Zd Z	dS )	TestMultiprocesscodereturnc           	          	 t          j        t          j        dgd|dd          }n?# t           j        $ r-}t          d|j         d|j         d|j                   |d }~ww xY w|j        |j        fS )Nz-Werror::DeprecationWarningTutf8)capture_outputinputencodingcheckzProcess returned z stdout=z stderr=)	
subprocessrunsys
executableCalledProcessErrorRuntimeError
returncodestdoutstderr)r   rV   r9   es       r   run_subproczTestMultiprocess.run_subproc~   s    	^!>?#  FF , 	 	 	VALVV!(VVAHVV 	 }fm++s   %( A$(AA$c                     t          j        d          }|                     |          \  }}|                     d                    t          |                    d           |                     |d           d S )Na  
            import asyncio
            from tornado.tcpserver import TCPServer

            async def main():
                server = TCPServer()
                server.listen(0, address='127.0.0.1')

            asyncio.run(main())
            print('012', end='')
         012textwrapdedentrh   r8   joinsortedr   rV   outerrs       r   test_listen_singlez#TestMultiprocess.test_listen_single   sw     

 
 ##D))S--u555b!!!!!r   c                     t          j        d          }|                     |          \  }}|                     d                    t          |                    d           |                     |d           d S )Na  
            import warnings

            from tornado.ioloop import IOLoop
            from tornado.process import task_id
            from tornado.tcpserver import TCPServer

            warnings.simplefilter("ignore", DeprecationWarning)

            server = TCPServer()
            server.bind(0, address='127.0.0.1')
            server.start(3)
            IOLoop.current().run_sync(lambda: None)
            print(task_id(), end='')
        rj   rk   rl   rq   s       r   test_bind_startz TestMultiprocess.test_bind_start   u    
 
" ##D))S--u555b!!!!!r   c                     t          j        d          }|                     |          \  }}|                     d                    t          |                    d           |                     |d           d S )Na  
            import asyncio
            from tornado.netutil import bind_sockets
            from tornado.process import fork_processes, task_id
            from tornado.ioloop import IOLoop
            from tornado.tcpserver import TCPServer

            sockets = bind_sockets(0, address='127.0.0.1')
            fork_processes(3)
            async def post_fork_main():
                server = TCPServer()
                server.add_sockets(sockets)
            asyncio.run(post_fork_main())
            print(task_id(), end='')
        rj   rk   rl   rq   s       r   test_add_socketsz!TestMultiprocess.test_add_sockets   rw   r   c                     t          j        d          }|                     |          \  }}|                     d                    t          |                    d           |                     |d           d S )Na  
            import asyncio
            import socket
            from tornado.netutil import bind_sockets
            from tornado.process import task_id, fork_processes
            from tornado.tcpserver import TCPServer

            # Pick an unused port which we will be able to bind to multiple times.
            (sock,) = bind_sockets(0, address='127.0.0.1',
                family=socket.AF_INET, reuse_port=True)
            port = sock.getsockname()[1]

            fork_processes(3)

            async def main():
                server = TCPServer()
                server.listen(port, address='127.0.0.1', reuse_port=True)
            asyncio.run(main())
            print(task_id(), end='')
            rj   rk   rl   rq   s       r   test_listen_multi_reuse_portz-TestMultiprocess.test_listen_multi_reuse_port   su    
 
, ##D))S--u555b!!!!!r   N)
r    r!   r"   strr   rh   rt   rv   ry   r{   r$   r   r   rU   rU   w   sz        , ,c3h , , , ," " "(" " "," " "," " " " "r   rU   )r(   r^   r`   rm   unittesttornador   tornado.iostreamr   tornado.logr   tornado.tcpserverr   tornado.test.utilr   tornado.testingr   r	   r
   r   typingr   r   TestCaserU   r$   r   r   <module>r      sE        



         % % % % % %       ' ' ' ' ' ' + + + + + + P P P P P P P P P P P P      a a a a aM a a aL n" n" n" n" n"x( n" n" n" n" n"r   