
    ∋dQ(                       d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZmZ d dlmZ d d	lmZ  ej        e          Z e
d
d          Z G d d          Z e            Z d Z! G d de"          Z#d Z$ G d d          Z% e&            Z' G d de          Z( G d de          Z) G d de          Z* G d de          Z+ e+            ed<   dS )    )annotationsN)deque
namedtuple)Future)IOLoop)CommCommClosedError	ConnectorListener)Backendbackends)nested_deserialize)get_ipConnectionRequestc2s_qs2c_qc_loopc_addr
conn_eventc                  L    e Zd ZdZd Zed             Zd Zd Zd Z	d Z
d Zd	S )
Managerz?
    An object coordinating listeners and their addresses.
    c                    t          j                    | _        t          j        d          | _        d | _        t          j                    | _	        d S N   )
weakrefWeakValueDictionary	listeners	itertoolscountaddr_suffixes_ip	threadingLocklockselfs    7lib/python3.11/site-packages/distributed/comm/inproc.py__init__zManager.__init__   s>     466&_Q//N$$			    c                v    | j         s,	 t                      | _         n# t          $ r
 d| _         Y nw xY w| j         S )Nz	127.0.0.1)r"   r   OSErrorr&   s    r(   ipz
Manager.ip%   sK    x 	''!88 ' ' '&'xs    11c                    | j         5  || j        v rt          d|          || j        |<   d d d            d S # 1 swxY w Y   d S )Nzalready listening on )r%   r   RuntimeError)r'   addrlisteners      r(   add_listenerzManager.add_listener.   s    Y 	, 	,t~%%"#C4#C#CDDD#+DN4 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	,s   &;??c                z    | j         5  	 | j        |= n# t          $ r Y nw xY wd d d            d S # 1 swxY w Y   d S N)r%   r   KeyErrorr'   r0   s     r(   remove_listenerzManager.remove_listener4   s    Y 	 	N4((   	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s$   00
 0 044c                    | j         5  |                     |           | j                            |          cd d d            S # 1 swxY w Y   d S r4   )r%   validate_addressr   getr6   s     r(   get_listener_forzManager.get_listener_for;   s    Y 	, 	,!!$'''>%%d++	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	,s   /AAAc                b    d| j         t          j                    t          | j                  fz  S )Nz%s/%d/%s)r-   osgetpidnextr!   r&   s    r(   new_addresszManager.new_address@   s'    TWbikk48J3K3KLLLr*   c           	         |                     d          \  }}}|| j        k    s$t          |          t          j                    k    r/t          d|d| j        dt          j                    d          dS )z3
        Validate the address' IP and pid.
        /zinproc address z does not match host (z
) or pid ()N)splitr-   intr=   r>   
ValueError)r'   r0   r-   pidsuffixs        r(   r9   zManager.validate_addressC   sv     **S//C==CHH	33*44")++++/   43r*   N)__name__
__module____qualname____doc__r)   propertyr-   r2   r7   r;   r@   r9    r*   r(   r   r      s         % % %   X, , ,  , , ,
M M M	 	 	 	 	r*   r   c                 :    dt                                           z   S )z!
    Generate a new address.
    	inproc://)global_managerr@   rN   r*   r(   r@   r@   R   s     335555r*   c                      e Zd ZdS )
QueueEmptyN)rI   rJ   rK   rN   r*   r(   rS   rS   Y   s        Dr*   rS   c                \    |                                  rdS |                     |           dS )z?Helper setting the result only if the future was not cancelled.N)	cancelled
set_result)futresults     r(   _set_result_unless_cancelledrY   ]   s/    
}} NN6r*   c                  L    e Zd ZdZd Zd Zd Zd ZeZ e	            Z
e
fdZdS )QueuezI
    A single-reader, single-writer, non-threadsafe, peekable queue.
    c                :    t                      | _        d | _        d S r4   )r   _q_read_futurer&   s    r(   r)   zQueue.__init__i   s    '' r*   c                J    | j         }|st          |                                S r4   )r]   rS   popleft)r'   qs     r(   
get_nowaitzQueue.get_nowaitm   s%    G 	yy{{r*   c                    | j         r
J d            t                      }| j        }|r(|                    |                                           n|| _         |S )NzOnly one reader allowed)r^   r   r]   rV   r`   )r'   rW   ra   s      r(   r:   z	Queue.gets   s\    $??&????hhG 	$NN199;;'''' #D
r*   c                    | j         }| j        }|.t          |          dk    sJ d | _        t          ||           d S |                    |           d S )Nr   )r]   r^   lenrY   append)r'   valuera   rW   s       r(   
put_nowaitzQueue.put_nowait}   sZ    G?q66Q;;;; $D(e44444HHUOOOOOr*   c                H    | j         }|r|d         S || j        ur|S t          )zV
        Get the next object in the queue without removing it from the queue.
        r   )r]   _omittedrS   )r'   defaultra   s      r(   peekz
Queue.peek   s4     G 	Q4KDM))Nr*   N)rI   rJ   rK   rL   r)   rb   r:   rh   putobjectrj   rl   rN   r*   r(   r[   r[   d   s         ! ! !       CvxxH# 
 
 
 
 
 
r*   r[   c                       e Zd ZdZdZ	 dd fd	Zd
 Zedd            Zedd            Z	edd            Z
ddZddZd Zd Zd Z xZS )InProcz
    An established communication based on a pair of in-process queues.

    Reminder: a Comm must always be used from a single thread.
    Its peer Comm can be running in any thread.
    FT
local_addrstr	peer_addrdeserializeboolc                   t                                          |           || _        || _        || _        || _        || _        d| _        t          j	        | | 
                                          | _        d| j        _        d| _        d S )N)rt   FT)superr)   _local_addr
_peer_addr_read_q_write_q_write_loop_closedr   finalize_get_finalizer
_finalizeratexit_initialized)r'   rq   rs   read_qwrite_q
write_looprt   	__class__s          r(   r)   zInProc.__init__   s     	[111%#%!*41D1D1F1FGG!& r*   c                F    t          |           }| j        | j        |fd}|S )Nc                    t                               d|            |                    | j        t                     d S )NzClosing dangling queue in )loggerwarningadd_callbackrh   _EOF)r   r   rs      r(   r~   z'InProc._get_finalizer.<locals>.finalize   s<    NN;;;<<<##G$6=====r*   )reprr{   r|   )r'   r   r~   s      r(   r   zInProc._get_finalizer   s4    JJ!]t7G1 	> 	> 	> 	> r*   returnc                    | j         S r4   )rx   r&   s    r(   local_addresszInProc.local_address   s    r*   c                    | j         S r4   )ry   r&   s    r(   peer_addresszInProc.peer_address   s
    r*   c                    dS NTrN   r&   s    r(   	same_hostzInProc.same_host   s    tr*   ignoredc                  K   | j         rt                      | j                                         d {V }|t          u r.d| _         | j                                         t                      | j        rt          |          }|S r   )	r}   r	   rz   r:   r   r   detachrt   r   )r'   deserializersmsgs      r(   readzInProc.read   s      < 	$!###L$$&&&&&&&&$;;DLO""$$$!### 	*$S))C
r*   Nc                   K   |                                  rt                      | j                            | j        j        |           dS r   )closedr	   r|   r   r{   rh   )r'   r   serializerson_errors       r(   writezInProc.write   sG      ;;== 	$!### 	%%dm&>DDDqr*   c                2   K   |                                   d S r4   )abortr&   s    r(   closezInProc.close   s      

r*   c                    |                                  sy| j                            | j        j        t
                     | j                            t
                     d x| _        | _        d| _        | j        	                                 d S d S r   )
r   r|   r   r{   rh   r   rz   r}   r   r   r&   s    r(   r   zInProc.abort   s    {{}} 	%))$-*BDIIIL##D)))+//DMDLDLO""$$$$$	% 	%r*   c                    | j         rdS | j        rC| j                            d          t          u r"d| _         | j                                         dS dS )z
        Whether this comm is closed.  An InProc comm is closed if:
            1) close() or abort() was called on this comm
            2) close() or abort() was called on the other end and the
               read queue is empty
        TNF)r}   r   rz   rl   r   r   r   r&   s    r(   r   zInProc.closed   s]     < 	4 	!2!24!8!8D!@!@DLO""$$$45r*   T)rq   rr   rs   rr   rt   ru   )r   rr   )r   ru   )r   )NN)rI   rJ   rK   rL   r   r)   r   rM   r   r   r   r   r   r   r   r   __classcell__)r   s   @r(   rp   rp      s         L !! ! ! ! ! ! !*         X     X    X        % % %      r*   rp   c                  d    e Zd ZdZddZd Zd Zd Zd Zd Z	e
d	             Ze
d
             ZdS )InProcListenerinprocTc                    t           | _        |p| j                                        | _        || _        || _        t                      | _        d S r4   )rQ   managerr@   addresscomm_handlerrt   r[   listen_q)r'   r   r   rt   s       r(   r)   zInProcListener.__init__  sB    %<$,":":"<"<(&r*   c                   K   	 |                      |           d {V  n+# t          $ r t                              d           Y d S w xY w|                     |           d {V  d S )Nz,Connection closed before handshake completed)on_connectionr	   r   debugr   )r'   comms     r(   _handle_streamzInProcListener._handle_stream  s      	$$T********** 	 	 	LLGHHHFF	 %%%%%%%%%%%s     $AAc                b  K   	 | j                                          d {V }|d S t          d| j        z   d|j        z   |j        |j        |j        | j                  }|j        	                    |j
        j                   t          j                    	                    | j        |           )NTrP   rq   rs   r   r   r   rt   )r   r:   rp   r   r   r   r   r   rt   r   r   setr   currentr   )r'   conn_reqr   s      r(   _listenzInProcListener._listen  s      	E!]..00000000H&5%7~ #? ,  D O(()<)@AAAN))$*=tDDD	Er*   c                P    | j                             | j        j        |           d S r4   )loopr   r   rh   )r'   r   s     r(   connect_threadsafez!InProcListener.connect_threadsafe'  s%    	t}7BBBBBr*   c                   K   t          j                    | _        t          j        |                                           | _        | j                            | j	        |            d S r4   )
r   r   r   asyncioensure_futurer   _listen_futurer   r2   r   r&   s    r(   startzInProcListener.start*  sP      N$$	%3DLLNNCC!!$,55555r*   c                x    | j                             d            | j                            | j                   d S r4   )r   rh   r   r7   r   r&   s    r(   stopzInProcListener.stop/  s6      &&&$$T\22222r*   c                    d| j         z   S NrP   r   r&   s    r(   listen_addresszInProcListener.listen_address3      T\))r*   c                    d| j         z   S r   r   r&   s    r(   contact_addresszInProcListener.contact_address7  r   r*   Nr   )rI   rJ   rK   prefixr)   r   r   r   r   r   rM   r   r   rN   r*   r(   r   r     s        F       & & &E E E"C C C6 6 6
3 3 3 * * X* * * X* * *r*   r   c                      e Zd Zd ZddZdS )InProcConnectorc                    || _         d S r4   )r   )r'   r   s     r(   r)   zInProcConnector.__init__=  s    r*   Tc                  K   | j                             |          }|t          d|          t          t	                      t	                      t          j                    | j                                         t          j	                              }|
                    |           |j                                         d {V  t          d|j        z   d|z   |j        |j        |j        |          }|S )Nzno endpoint for inproc address r   rP   r   )r   r;   r,   r   r[   r   r   r@   r   Eventr   r   waitrp   r   r   r   r   )r'   r   rt   connection_argsr1   r   r   s          r(   connectzInProcConnector.connect@  s      <0099GGGGHHH$''''>##<++--}
 
 
 	##H--- !&&((((((((("X_4!G+>N}#
 
 
 r*   Nr   )rI   rJ   rK   r)   r   rN   r*   r(   r   r   <  s7               r*   r   c                  0    e Zd ZeZd Zd Zd Zd Zd Z	dS )InProcBackendc                *    t          | j                  S r4   )r   r   r&   s    r(   get_connectorzInProcBackend.get_connectorb  s    t|,,,r*   c                $    t          |||          S r4   )r   )r'   lochandle_commrt   r   s        r(   get_listenerzInProcBackend.get_listenere  s    c;<<<r*   c                N    | j                             |           | j         j        S r4   )r   r9   r-   r'   r   s     r(   get_address_hostzInProcBackend.get_address_hostj  s"    %%c***|r*   c                    |S r4   rN   r   s     r(   resolve_addresszInProcBackend.resolve_addressn  s    
r*   c                h    | j                             |           | j                                         S r4   )r   r9   r@   r   s     r(   get_local_address_forz#InProcBackend.get_local_address_forq  s,    %%c***|'')))r*   N)
rI   rJ   rK   rQ   r   r   r   r   r   r   rN   r*   r(   r   r   ]  sd        G- - -= = =
    * * * * *r*   r   r   ),
__future__r   r   r   loggingr=   r#   r   collectionsr   r   tornado.concurrentr   tornado.ioloopr   distributed.comm.corer   r	   r
   r   distributed.comm.registryr   r   distributed.protocolr   distributed.utilsr   	getLoggerrI   r   r   r   rQ   r@   	ExceptionrS   rY   r[   rn   r   rp   r   r   r   rN   r*   r(   <module>r      s   " " " " " "       				      ) ) ) ) ) ) ) ) % % % % % % ! ! ! ! ! ! L L L L L L L L L L L L 7 7 7 7 7 7 7 7 3 3 3 3 3 3 $ $ $ $ $ $		8	$	$JM  
2 2 2 2 2 2 2 2j 6 6 6	 	 	 	 	 	 	 	  1 1 1 1 1 1 1 1h vxxf f f f fT f f fR5* 5* 5* 5* 5*X 5* 5* 5*p    i   B* * * * *G * * *2 #]__   r*   