
    Mh,                        d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZmZmZmZ ddlZddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ  G d d          Z G d de	          Z  G d de          Z!dS )zc Defines a KernelClient that provides thread-safe sockets with async callbacks on message
replies.
    N)Future)partial)Thread)AnyDictListOptional)IOLoop)InstanceType)
get_logger)	zmqstream   )	HBChannel)KernelClient)Sessionc                       e Zd ZdZdZdZdZdZdZde	e
j                 de	e         de	e         ddf fdZdZdefd	Zdd
ZddZddZdeeef         ddfdZdeddfdZdeeef         ddfdZddZddeddfdZddZ xZ S )ThreadedZMQSocketChannelz.A ZMQ socket invoking a callback in the ioloopNsocketsessionloopreturnc                     t                                                       | _        | _        | _        t                      d fd} j        J  j                            |                               d           dS )a)  Create a channel.

        Parameters
        ----------
        socket : :class:`zmq.Socket`
            The ZMQ socket to use.
        session : :class:`session.Session`
            The session to use.
        loop
            A tornado ioloop to connect the socket to using a ZMQStream
        r   Nc                  &   	 j         J t          j        j         j                  _        j                            j                                       d            d S # t          $ r } 	                    |            Y d } ~ d S d } ~ ww xY wN)
r   r   	ZMQStreamioloopstreamon_recv_handle_recv
set_result	Exceptionset_exceptionefselfs    W/var/www/html/test/jupyter/venv/lib/python3.11/site-packages/jupyter_client/threaded.pysetup_streamz7ThreadedZMQSocketChannel.__init__.<locals>.setup_stream<   s    #{...'1$+t{KK##D$5666 T"""""  # # #"""""""""#s   AA& &
B0BB
   timeoutr   N)super__init__r   r   r   r   add_callbackresult)r'   r   r   r   r)   r&   	__class__s   `    @r(   r/   z!ThreadedZMQSocketChannel.__init__$   s    " 	HH	# 	# 	# 	# 	# 	# 	# {&&&  ...	    Fc                     | j         S )zWhether the channel is alive.	_is_aliver'   s    r(   is_alivez!ThreadedZMQSocketChannel.is_aliveM   s
    ~r3   c                     d| _         dS )zStart the channel.TNr5   r7   s    r(   startzThreadedZMQSocketChannel.startQ   s    r3   c                     d| _         dS )zStop the channel.FNr5   r7   s    r(   stopzThreadedZMQSocketChannel.stopU   s    r3   c                      j          j        t                      d fd} j                            |           	                     d           nO# t
          $ rB}t                      }d j          d| }|                    |t          d	           Y d}~nd}~ww xY w j	        6	  j	        
                    d
           n# t
          $ r Y nw xY wd _	        dS dS )zClose the channel.Nr   c                      	 j         "j                             d           d _                             d            d S # t          $ r }                     |            Y d } ~ d S d } ~ ww xY w)Nr   linger)r   closer!   r"   r#   r$   s    r(   close_streamz4ThreadedZMQSocketChannel.close.<locals>.close_stream_   s    '{.)))333&* LL&&&&& ! ' ' 'OOA&&&&&&&&&'s   )A 
A-A((A-   r+   zError closing stream z:    )
stacklevelr   r?   r-   )r   r   r   r0   r1   r"   r   warningRuntimeWarningr   rA   )r'   rB   r%   logmsgr&   s   `    @r(   rA   zThreadedZMQSocketChannel.closeY   s7   ;"t{'>A' ' ' ' ' ' ' K$$\222?#### ? ? ? ll@dk@@Q@@CA>>>>>>>>?
 ;"!!!++++   DKKK #"s*   A 
B$"8BB$/C 
CCrI   c                 ^     d fd} j         J  j                             |           dS )zQueue a message to be sent from the IOLoop's thread.

        Parameters
        ----------
        msg : message to send

        This is threadsafe, as it uses IOLoop.add_callback to give the loop's
        thread control of the action.
        r   Nc                  Z    j         J j                             j                    d S r   )r   sendr   )rI   r'   s   r(   thread_sendz2ThreadedZMQSocketChannel.send.<locals>.thread_send   s1    <+++Ldk3/////r3   r-   )r   r0   )r'   rI   rM   s   `` r(   rL   zThreadedZMQSocketChannel.sendy   sS    	0 	0 	0 	0 	0 	0 	0 {&&&  -----r3   msg_listc                     | j         J | j        J | j                            |          \  }}| j                            |          }| j        r|                     |           |                     |           dS )z[Callback for stream.on_recv.

        Unpacks message, and calls handlers with it.
        N)r   r   feed_identitiesdeserialize_inspectcall_handlers)r'   rN   identsmsgrI   s        r(   r    z%ThreadedZMQSocketChannel._handle_recv   s    
 {&&&|'''l228<<tl&&t,,= 	MM#3r3   c                     dS )ai  This method is called in the ioloop thread when a message arrives.

        Subclasses should override this method to handle incoming messages.
        It is important to remember that this method is called in the thread
        so that some logic must be done to ensure that the application level
        handlers are called in the application thread.
        N r'   rI   s     r(   rS   z&ThreadedZMQSocketChannel.call_handlers   s	     	r3   c                     dS )zaSubclasses should override this with a method
        processing any pending GUI events.
        NrW   r7   s    r(   process_eventsz'ThreadedZMQSocketChannel.process_events   s	     	r3         ?r,   c                 2    t          j                    |z   } j        J  j         j                                        rd}t          |          dt          ddf fd}t          d          D ]}t                      } j        	                    t          ||                     t          |t          j                    z
  d          }	 |                    t          |t          j                    z
  d                     # t          $ r Y  dS w xY wdS )a  Immediately processes all pending messages on this channel.

        This is only used for the IOPub channel.

        Callers should use this method to ensure that :meth:`call_handlers`
        has been called for all messages that have been received on the
        0MQ SUB socket of this channel.

        This method is thread safe.

        Parameters
        ----------
        timeout : float, optional
            The maximum amount of time to spend flushing, in seconds. The
            default is one second.
        NzAttempt to flush closed streamr&   r   c                     	                                   |                     d            d S # t          $ r }|                     |           Y d }~d S d }~ww xY wr   )_flushr!   r"   r#   )r&   r%   r'   s     r(   flushz-ThreadedZMQSocketChannel.flush.<locals>.flush   st    # T"""""  # # #"""""""""#s   . 
AAArD   r   )time	monotonicr   r   closedOSErrorr   ranger   r0   r   maxr1   TimeoutError)r'   r,   	stop_time_msgr_   _r&   s   `      r(   r_   zThreadedZMQSocketChannel.flush   s8   & N$$w.	{&&&;$+"4"4"6"63D$--	#S 	#T 	# 	# 	# 	# 	# 	# q 		 		AAK$$WUA%6%6777)dn&6&66::GY)9)991==>>>>   		 		s   7D
DDc                 X    | j         J | j                                          d| _        dS )z"Callback for :method:`self.flush`.NT)r   r_   _flushedr7   s    r(   r^   zThreadedZMQSocketChannel._flush   s/    {&&&r3   r-   )r[   )!__name__
__module____qualname____doc__r   r   r   r   rR   r	   zmqSocketr   r
   r/   r6   boolr8   r:   r<   rA   r   strr   rL   r   r    rS   rZ   floatr_   r^   __classcell__r2   s   @r(   r   r      s       88GFFFH%$% '"% v	%
 
% % % % % %N I$             @.S#X .4 . . . .$ T  d        c3h D       + +U +T + + + +Z       r3   r   c                        e Zd ZdZdZdZd fdZeej	        dd                        Z
ddZddZdd	Zdd
ZddZddZ xZS )IOLoopThreadz;Run a pyzmq ioloop in a thread to send and receive messagesFNr   c                 V    t                                                       d| _        dS )zInitialize an io loop thread.TN)r.   r/   daemonr'   r2   s    r(   r/   zIOLoopThread.__init__   s$    r3   c                  0    t           dt           _        d S d S )NT)rx   _exitingrW   r3   r(   _notice_exitzIOLoopThread._notice_exit   s     
 #$(L!!! $#r3   c                     t                      | _        t          j        |            | j                            d           dS )z{Start the IOLoop thread

        Don't return until self.ioloop is defined,
        which is created in the thread
        r*   r+   N)r   _start_futurer   r:   r1   r7   s    r(   r:   zIOLoopThread.start   s@     &,XXT!!"!-----r3   c                     	 t          j                    }t          j        |           d fd}|                     |                        j                            d           n1# t          $ r$} j                            |           Y d}~nd}~ww xY w|                                                                dS )z0Run my loop, ignoring EINTR events in the pollerr   Nc                  <   K   t          j                     _        d S r   )r
   currentr   r7   s   r(   assign_ioloopz'IOLoopThread.run.<locals>.assign_ioloop  s      $n..r3   r-   )	asyncionew_event_loopset_event_looprun_until_completer   r!   r"   r#   
_async_run)r'   r   r   r%   s   `   r(   runzIOLoopThread.run   s    	0)++D"4(((/ / / / / / ##MMOO444 ))$////  	0 	0 	0,,Q////////	0
 	 1 122222s   A
A( (
B2BBc                 ^   K   | j         s#t          j        d           d{V  | j         !dS dS )z(Run forever (until self._exiting is set)r   N)r}   r   sleepr7   s    r(   r   zIOLoopThread._async_run  sR      - 	#-""""""""" - 	# 	# 	# 	# 	#r3   c                 r    d| _         |                                  |                                  d| _        dS )zStop the channel's event loop and join its thread.

        This calls :meth:`~threading.Thread.join` and returns when the thread
        terminates. :class:`RuntimeError` will be raised if
        :meth:`~threading.Thread.start` is called again.
        TN)r}   joinrA   r   r7   s    r(   r<   zIOLoopThread.stop  s0     		

r3   c                 .    |                                   d S r   )rA   r7   s    r(   __del__zIOLoopThread.__del__  s    

r3   c                 r    | j         /	 | j                             d           dS # t          $ r Y dS w xY wdS )zClose the io loop thread.NT)all_fds)r   rA   r"   r7   s    r(   rA   zIOLoopThread.close   sX    ;"!!$!/////    #"s   & 
44r-   )rl   rm   rn   ro   r}   r   r/   staticmethodatexitregisterr~   r:   r   r   r<   r   rA   ru   rv   s   @r(   rx   rx      s        EEHF     
 _) ) ) _ \)	. 	. 	. 	.3 3 3 3"# # # #

 
 
 
          r3   rx   c                   0    e Zd ZdZedee         fd            Z ee	d          Z
	 	 	 	 	 ddededed	ed
eddf fdZdeeef         ddfdZd fdZ ee          Z ee          Z ee          Z ee          Z ee          ZdefdZ xZS )ThreadedKernelClientzYA KernelClient that provides thread-safe sockets with async callbacks on message replies.r   c                 ,    | j         r| j         j        S d S r   )ioloop_threadr   r7   s    r(   r   zThreadedKernelClient.ioloop,  s     	-%,,tr3   T)
allow_noneshelliopubstdinhbcontrolNc                     t                      | _        | j                                         |r| j        | j        _        t                                          |||||           dS )z!Start the channels on the client.N)rx   r   r:   _check_kernel_info_replyshell_channelrR   r.   start_channels)r'   r   r   r   r   r   r2   s         r(   r   z#ThreadedKernelClient.start_channels4  sc     *^^  """ 	H*.*GD'ueUB@@@@@r3   rI   c                 d    |d         dk    r#|                      |           d| j        _        dS dS )zGThis is run in the ioloop thread when the kernel info reply is receivedmsg_typekernel_info_replyN)_handle_kernel_info_replyr   rR   rX   s     r(   r   z-ThreadedKernelClient._check_kernel_info_replyE  s?    z?111**3///*.D''' 21r3   c                     t                                                       | j        r4| j                                        r| j                                         dS dS dS )z Stop the channels on the client.N)r.   stop_channelsr   r8   r<   r{   s    r(   r   z"ThreadedKernelClient.stop_channelsK  si     	&$"4"="="?"? 	&##%%%%%	& 	& 	& 	&r3   c                 F    | j         | j                                         S dS )z$Is the kernel process still running?NT)_hb_channel
is_beatingr7   s    r(   r8   zThreadedKernelClient.is_aliveW  s)    ' #..000 tr3   )TTTTTr-   )rl   rm   rn   ro   propertyr	   r
   r   r   rx   r   rr   r   r   rs   r   r   r   r   r   iopub_channel_classshell_channel_classstdin_channel_classr   hb_channel_classcontrol_channel_classr8   ru   rv   s   @r(   r   r   )  s       cc(    X
 H\d;;;M A AA A 	A
 A A 
A A A A A A"/DcN /t / / / /& & & & & & $788$788$788tI D!9::$        r3   r   )"ro   r   r   r`   concurrent.futuresr   	functoolsr   	threadingr   typingr   r   r   r	   rp   tornado.ioloopr
   	traitletsr   r   traitlets.logr   zmq.eventloopr   channelsr   clientr   r   r   r   rx   r   rW   r3   r(   <module>r      s       % % % % % %             , , , , , , , , , , , , 



 ! ! ! ! ! ! $ $ $ $ $ $ $ $ $ $ $ $ $ $ # # # # # #                               DI I I I I6 I I IX6 6 6 6 6< 6 6 6 6 6r3   