U
    ҥc|P                     @   s   d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZ ddlmZmZ ddlmZmZmZmZ ddlmZ ddlZddlmZ dd	lmZ dd
lmZ dZdZG dd dZG dd dZG dd deZ dS )z.Wrappers for forwarding stdout/stderr over zmq    N)b2a_hex)deque)StringIO
TextIOBase)AnyCallableDequeOptional)WeakSet)extract_header)IOLoop)	ZMQStream   c                   @   s   e Zd ZdZd&ddZdd Zdd Zed	d
 Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zedd Zdd  Zd!d" Zd#d$ Zd%S )'IOPubThreada   An object for sending IOPub messages in a background thread

    Prevents a blocking main thread from delaying output from threads.

    IOPubThread(pub_socket).background_socket is a Socket-API-providing object
    whose IO is always run in a thread.
    Fc                 C   s   || _ t| | _t | _|| _tdd| _|r8| 	  t
 | _t | _t | _|   t
j| jdd| _d| j_d| j_d| j_d| j_dS )a  Create IOPub thread

        Parameters
        ----------
        socket : zmq.PUB Socket
            the socket on which messages will be sent.
        pipe : bool
            Whether this process should listen for IOPub messages
            piped from subprocesses.
        F)Zmake_currentIOPub)targetnameTN)socketBackgroundSocketZbackground_socketosgetpid_master_pid
_pipe_flagr   io_loop_setup_pipe_in	threadinglocal_localr   _eventsr
   _event_pipes_setup_event_pipeThread_thread_mainthreaddaemonZpydev_do_not_traceZis_pydev_daemon_threadr   )selfr   pipe r'   1lib/python3.8/site-packages/ipykernel/iostream.py__init__-   s     


zIOPubThread.__init__c                 C   s   | j   | j jdd dS )z.The inner loop that's actually run in a threadT)Zall_fdsN)r   startcloser%   r'   r'   r(   r"   I   s    
zIOPubThread._thread_mainc                 C   sf   | j j}| tj}d|_ttdd}d|  }| _	|
| t|| j| _| j| j dS )zLCreate the PULL socket listening for events that should fire in this thread.r      asciizinproc://%sN)r   contextzmqPULLlingerr   r   urandomdecode_event_interfaceZbindr   r   Z_event_pulleron_recv_handle_event)r%   ctxpipe_inZ_uuidZifacer'   r'   r(   r    N   s    
zIOPubThread._setup_event_pipec                 C   s`   z| j j}W nN tk
rZ   | jj}|tj}d|_|| j	 || j _| j
| Y nX |S )zSthread-local event pipe for signaling events that should be processed in the threadr   )r   
event_pipeAttributeErrorr   r/   r0   PUSHr2   connectr5   r   add)r%   r:   r8   r'   r'   r(   _event_pipeZ   s    zIOPubThread._event_pipec                 C   s,   t | j}t|D ]}| j }|  qdS )zHandle an event on the event pipe

        Content of the message is ignored.

        Whenever *an* event arrives on the event stream,
        *all* waiting events are processed in order.
        N)lenr   rangepopleft)r%   msgZn_events_Zevent_fr'   r'   r(   r7   k   s    


zIOPubThread._handle_eventc              
   C   s   | j j}td| _| tj}d|_z|d| _	W nJ tj
k
r } z*td| d  d| _|  W Y dS d}~X Y nX t|| j| _| j| j dS )z7setup listening pipe for IOPub from forked subprocessesr-   r   ztcp://127.0.0.1z)Couldn't bind IOPub Pipe to 127.0.0.1: %sz'
subprocess output will be unavailable.FN)r   r/   r   r3   
_pipe_uuidr0   r1   r2   Zbind_to_random_port
_pipe_portZZMQErrorwarningswarnr   r+   r   r   Z_pipe_inr6   _handle_pipe_msg)r%   r8   r9   er'   r'   r(   r   z   s"    zIOPubThread._setup_pipe_inc                 C   sJ   | j r|  sdS |d | jkr4td|tjd dS | |dd  dS )z'handle a pipe message from a subprocessNr   zBad pipe message: %sfiler   )r   _is_master_processrE   printsys
__stderr__send_multipartr%   rC   r'   r'   r(   rI      s    zIOPubThread._handle_pipe_msgc                 C   s2   t  }|t j}d|_|d| j  ||fS )Ni  ztcp://127.0.0.1:%i)r0   ZContextr   r<   r2   r=   rF   )r%   r8   pipe_outr'   r'   r(   _setup_pipe_out   s
    zIOPubThread._setup_pipe_outc                 C   s   t  | jkS Nr   r   r   r,   r'   r'   r(   rM      s    zIOPubThread._is_master_processc                 C   s   | j r|  rtS tS dS )z8check for forks, and switch to zmq pipeline if necessaryN)r   rM   MASTERCHILDr,   r'   r'   r(   _check_mp_mode   s    zIOPubThread._check_mp_modec                 C   s"   d| j _| j   t| j dS )zStart the IOPub threadr   N)r#   r   r*   atexitregisterstopr,   r'   r'   r(   r*      s    
zIOPubThread.startc                 C   s@   | j  sdS | j| jj | j   | jD ]}|  q.dS )zStop the IOPub threadN)r#   is_aliver   Zadd_callbackr\   joinr   r+   )r%   r:   r'   r'   r(   r\      s    


zIOPubThread.stopc                 C   s   | j r
dS | j  d| _dS )zClose the IOPub thread.N)closedr   r+   r,   r'   r'   r(   r+      s    
zIOPubThread.closec                 C   s
   | j d kS rU   )r   r,   r'   r'   r(   r_      s    zIOPubThread.closedc                 C   s.   | j  r$| j| | jd n|  dS )ztSchedule a function to be called in our IO thread.

        If the thread is not running, call immediately.
            N)r#   r]   r   appendr?   send)r%   fr'   r'   r(   schedule   s    
zIOPubThread.schedulec                    s     fdd dS )zsend_multipart schedules actual zmq send in my thread.

        If my thread isn't running (e.g. forked process), send immediately.
        c                      s   j  S rU   )_really_sendr'   argskwargsr%   r'   r(   <lambda>   r`   z,IOPubThread.send_multipart.<locals>.<lambda>N)rd   r%   rg   rh   r'   rf   r(   rQ      s    zIOPubThread.send_multipartc                 O   sj   | j r
dS |  }|tkr0| jj|f|| n6|  \}}|j| jf|f|| |  |  dS )z)The callback that actually sends messagesN)	r_   rY   rX   r   rQ   rT   rE   r+   Zterm)r%   rC   rg   rh   Zmp_moder8   rS   r'   r'   r(   re      s    zIOPubThread._really_sendN)F)__name__
__module____qualname____doc__r)   r"   r    propertyr?   r7   r   rI   rT   rM   rY   r*   r\   r+   r_   rd   rQ   re   r'   r'   r'   r(   r   $   s(   

	
r   c                       sH   e Zd ZdZdZdd Z fddZ fddZd	d
 Zdd Z	  Z
S )r   z>Wrapper around IOPub thread that provides zmq send[_multipart]Nc                 C   s
   || _ dS )zInitialize the socket.N)	io_thread)r%   rp   r'   r'   r(   r)      s    zBackgroundSocket.__init__c                    sv   | dr |dr t | | jdk	s.tt| jj|rftj	d| d| t
dd t| jj|S t | dS )z2Wrap socket attr access for backward-compatibility__NzAccessing zmq Socket attribute O on BackgroundSocket is deprecated since ipykernel 4.3.0 use .io_thread.socket.   
stacklevel)
startswithendswithsuper__getattr__rp   AssertionErrorhasattrr   rG   rH   DeprecationWarninggetattr)r%   attr	__class__r'   r(   ry      s    zBackgroundSocket.__getattr__c                    sj   |dks| dr,|dr,t || n:tjd| d| tdd | jdk	sVtt	| jj
|| dS )zSet an attribute on the socket.rp   rq   zSetting zmq Socket attribute rr   rs   rt   N)rv   rw   rx   __setattr__rG   rH   r|   rp   rz   setattrr   )r%   r~   valuer   r'   r(   r     s    zBackgroundSocket.__setattr__c                 O   s   | j |gf||S )zSend a message to the socket.)rQ   )r%   rC   rg   rh   r'   r'   r(   rb     s    zBackgroundSocket.sendc                 O   s   | j dk	st| j j||S )zSchedule send in IO threadN)rp   rz   rQ   rj   r'   r'   r(   rQ     s    zBackgroundSocket.send_multipart)rk   rl   rm   rn   rp   r)   ry   r   rb   rQ   __classcell__r'   r'   r   r(   r      s   r   c                   @   s   e Zd ZU dZdZdZdZdZdZe	e
 ed< dd Zd	d
 Zd-dddddZdd Zdd Zdd Zdd Zdd Zedd Zdd Zdd Zd d! Zee	e d"d#d$Zd%d& Zd'd( Zd)d* Zd+d, ZdS ).	OutStreamzpA file like object that publishes the stream to a 0MQ PUB socket.

    Output is handed off to an IO Thread
    
   g?NzUTF-8_excc                 C   s(   t | dddk	r| jS d}t|dS )z^
        Things like subprocess will peak and write to the fileno() of stderr/stdout.
        _original_stdstream_copyNfileno)r}   r   ioUnsupportedOperationrR   r'   r'   r(   r   4  s    zOutStream.filenoc                 C   sl   zHt | jd}|rF| jrF| |  t | j| t | jd}qW n tk
rf   t	 | _
Y nX dS )ai  
        We've redirected standards steams 0 and 1 into a pipe.

        We need to watch in a thread and redirect them to the right places.

        1) the ZMQ channels to show in notebook interfaces,
        2) the original stdout/err, to capture errors in terminals.

        We cannot schedule this on the ioloop thread, as this might be blocking.

        i  N)r   read_fid_should_watchwriter4   r   	ExceptionrO   exc_infor   )r%   Zbtsr'   r'   r(   _watch_pipe_fd>  s    
zOutStream._watch_pipe_fdTF)watchfdisattyc          	      C   s  |dk	rt jdtdd || _t|tsLt jd| tdd t|}|  || _|| _d|	  | _
i | _t | _d| _d| _|j| _t | _t | _d| _t|| _d| _|rtjdstjd	rd
tjkrd| _|  | |rt!|drt!|dr|| _nd}t"|dS )a  
        Parameters
        ----------
        session : object
            the session object
        pub_thread : threading.Thread
            the publication thread
        name : str {'stderr', 'stdout'}
            the name of the standard stream to replace
        pipe : object
            the pip object
        echo : bool
            whether to echo output
        watchfd : bool (default, True)
            Watch the file descripttor corresponding to the replaced stream.
            This is useful if you know some underlying code will write directly
            the file descriptor by its number. It will spawn a watching thread,
            that will swap the give file descriptor for a pipe, read from the
            pipe, and insert this into the current Stream.
        isatty : bool (default, False)
            Indication of whether this stream has termimal capabilities (e.g. can handle colors)

        NzKpipe argument to OutStream is deprecated and ignored since ipykernel 4.2.3.rs   rt   zISince IPykernel 4.3, OutStream should be created with IOPubThread, not %rs   stream.FZlinuxdarwinZPYTEST_CURRENT_TESTTr   r   z(echo argument must be a file like object)#rG   rH   r|   session
isinstancer   r*   
pub_threadr   encodetopicparent_headerr   r   r   _flush_pending_subprocess_flush_pendingr   _io_loopr   RLock_buffer_lockr   _bufferechobool_isattyr   rO   platformrv   environ_setup_stream_redirectsr{   
ValueError)	r%   r   r   r   r&   r   r   r   rC   r'   r'   r(   r)   T  sZ    "






zOutStream.__init__c                 C   s   | j S )zpReturn a bool indicating whether this is an 'interactive' stream.

        Returns:
            Boolean
        )r   r,   r'   r'   r(   r     s    zOutStream.isattyc                 C   sd   t  \}}tt| }t || _t || || _d | _	t
j| jd| _d| j_| j  d S )N)r   T)r   r&   r}   rO   r   dupr   dup2r   r   r   r!   r   watch_fd_threadr$   r*   )r%   r   ZprZpwZfnor'   r'   r(   r     s    z!OutStream._setup_stream_redirectsc                 C   s   t  | jkS rU   rV   r,   r'   r'   r(   rM     s    zOutStream._is_master_processc                 C   s   t || _dS )zSet the parent header.N)r   r   )r%   parentr'   r'   r(   
set_parent  s    zOutStream.set_parentc                 C   s@   | j rd| _ | j  | jr6| j\}}}t||| d| _dS )zClose the stream.FN)r   r   r^   r   	tracebackprint_exceptionr   )r%   etyper   tbr'   r'   r(   r+     s    
zOutStream.closec                 C   s
   | j d kS rU   )r   r,   r'   r'   r(   r_     s    zOutStream.closedc                    s,    j r
dS d _  fdd} j| dS )zuschedule a flush in the IO thread

        call this on write, to indicate that flush should be called soon.
        NTc                      s    j  j j d S rU   )r   Z
call_laterflush_interval_flushr'   r,   r'   r(   _schedule_in_thread  s    z6OutStream._schedule_flush.<locals>._schedule_in_thread)r   r   rd   )r%   r   r'   r,   r(   _schedule_flush  s
    zOutStream._schedule_flushc                 C   s~   | j rr| j jdk	rr| j j rr| j jjt jkrr| j | j t }| j |j	 |
| jsztdtjd n|   dS )zStrigger actual zmq send

        send will happen in the background thread
        NzIOStream.flush timed outrK   )r   r#   r]   identr   current_threadrd   r   Eventsetwaitflush_timeoutrN   rO   rP   )r%   Zevtr'   r'   r(   flush  s    

zOutStream.flushc              
   C   s   d| _ d| _| jdk	rhz| j  W nB tk
rf } z$| jtjk	rVtd| tjd W 5 d}~X Y nX |  }|rt	
 | j_| j|d}| jj| jd|| j| jd dS )zThis is where the actual send happens.

        _flush should generally be called in the IO thread,
        unless the thread has been destroyed (e.g. forked subprocess).
        FNzFlush failed: rK   )r   textstream)contentr   r   )r   r   r   r   OSErrorrO   rP   rN   _flush_bufferr   r   r   pidr   rb   r   r   r   )r%   rJ   datar   r'   r'   r(   r     s&    
&zOutStream._flush)stringreturnc              
   C   s   t |ts dt| }t|| jdk	r~z| j| W nB tk
r| } z$| jtjk	rlt	d| tjd W 5 d}~X Y nX | j
dkrd}t|nT|   }| j | j| W 5 Q R X |r| jrdS d| _| j
| j n|   t|S )zWrite to current stream after encoding if necessary

        Returns
        -------
        len : int
            number of items from input parameter written to stream.

        z"write() argument must be str, not NzWrite failed: rK   I/O operation on closed fileT)r   strtype	TypeErrorr   r   r   rO   rP   rN   r   r   rM   r   r   r   rd   r   r   r@   )r%   r   rC   rJ   Zis_childr'   r'   r(   r     s,    


&


zOutStream.writec                 C   s0   | j dkrd}t|n|D ]}| | qdS )zWrite lines to the stream.Nr   )r   r   r   )r%   ZsequencerC   r   r'   r'   r(   
writelinesA  s
    

zOutStream.writelinesc                 C   s   dS )z$Test whether the stream is writable.Tr'   r,   r'   r'   r(   writableJ  s    zOutStream.writablec                 C   s   |   }| }|  |S )z<clear the current buffer and return the current buffer data.)_rotate_buffergetvaluer+   )r%   bufr   r'   r'   r(   r   N  s    zOutStream._flush_bufferc              	   C   s$   | j  | j}t | _W 5 Q R X |S )z@Returns the current buffer and replaces it with an empty buffer.)r   r   r   )r%   Z
old_bufferr'   r'   r(   r   U  s    zOutStream._rotate_buffer)NN) rk   rl   rm   rn   r   r   r   encodingr   r	   r   __annotations__r   r   r)   r   r   rM   r   r+   ro   r_   r   r   r   r   intr   r   r   r   r   r'   r'   r'   r(   r   %  s:   

  S

*	r   )!rn   rZ   r   r   rO   r   r   rG   Zbinasciir   collectionsr   r   r   typingr   r   r   r	   weakrefr
   r0   Zjupyter_client.sessionr   Ztornado.ioloopr   Zzmq.eventloop.zmqstreamr   rW   rX   r   r   r   r'   r'   r'   r(   <module>   s,    O3