
    ∋dh                    f   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZmZ d dlmZmZmZmZ d d	lmZ d d
lmZm Z m!Z!m"Z" d dl#m$Z$m%Z%m&Z&m'Z'  ej(        e)          Z* e+            Z,	 	 d0d1dZ- G d dej.                  Z/ G d de          Z0 G d de0          Z1d Z2d2d!Z3 G d" d#e          Z4 G d$ d%e4          Z5 G d& d'e          Z6 G d( d)e6          Z7 G d* d+e          Z8 G d, d-e8          Z9 G d. d/          Z:dS )3    )annotationsN)_SelectorSocketTransport)Callable)islice)Any)parse_host_portunparse_host_port)CommCommClosedError	ConnectorListener)Backend)ensure_concrete_hostfrom_frames
host_array	to_frames)	ensure_ipensure_memoryviewget_ipget_ipv6      bufferslist[bytes]target_buffer_sizeintsmall_buffer_sizereturnc                .   t          |           dk    r| S g g ddfd}| D ]a}t          |          }||k    r+                    |           |z  |k    r
 |             B |                                 |           b |             S )a  Given a list of buffers, coalesce them into a new list of buffers that
    minimizes both copying and tiny writes.

    Parameters
    ----------
    buffers : list of bytes_like
    target_buffer_size : int, optional
        The target intermediate buffer size from concatenating small buffers
        together. Coalesced buffers will be no larger than approximately this size.
    small_buffer_size : int, optional
        Buffers <= this size are considered "small" and may be copied.
       r   r   Nonec                      rot                     dk    r                     d                    n(                    d                                                                            dd S d S )Nr    r       )lenappendjoinclear)concatcsizeout_bufferss   <lib/python3.11/site-packages/distributed/comm/asyncio_tcp.pyflushzcoalesce_buffers.<locals>.flush=   sw     	6{{a""6!9----""388F#3#3444LLNNNEEE	 	r#   r   r!   )r$   r%   )	r   r   r   r,   bsizer(   r)   r*   s	         @@@r+   coalesce_buffersr0   $   s    $ 7||q!KFE         	" 	"1vv$$$MM!TME***EGGGq!!!!	EGGGr#   c                      e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<    ee          Z	 	 dFdG fd#ZedHd&            Z	edHd'            Z
edId(            ZdJd*ZdKd,ZdJd-ZdLd0ZdMd3ZdNd5ZdJd6ZdId7ZdId8ZdId9ZdId:ZdJd;ZdJd<ZdOdPd?ZdJd@ZdJdAZdQdCZdRdEZ xZS )SDaskCommProtocola:  Manages a state machine for parsing the message framing used by dask.

    Parameters
    ----------
    on_connection : callable, optional
        A callback to call on connection, used server side for handling
        incoming connections.
    min_read_size : int, optional
        The minimum buffer size to pass to ``socket.recv_into``. Larger sizes
        will result in fewer recv calls, at the cost of more copying. For
        request-response comms (where only one message may be in the queue at a
        time), a smaller value is likely more performant.
    )Callable[[DaskCommProtocol], None] | Noneon_connectionasyncio.AbstractEventLoop_loopzasyncio.Queue | None_queuez/asyncio.WriteTransport | _ZeroCopyWriter | None
_transportbool_pausedzasyncio.Future | None_drain_waiterzasyncio.Future_closed_waiter_using_default_bufferr   _default_len
memoryview_default_buffer_default_start_default_end
int | None_nframeszlist[int] | None_frame_lengthszlist[memoryview] | None_frames_frame_index_frame_nbytes_needed_frame_nbytes_remainingN   min_read_sizec                   t                                                       || _        t          j                    | _        t          j                    | _        d | _        d| _	        d | _
        | j                                        | _        d| _        t          |d          | _        t!          | j                  | _        d| _        d| _        d | _        d | _        d | _        d | _        d| _        d| _        d S )NFT   r   )super__init__r4   asyncioget_running_loopr6   Queuer7   r8   r:   r;   create_futurer<   r=   maxr>   r   r@   rA   rB   rD   rE   rF   rG   rH   rI   )selfr4   rK   	__class__s      r+   rO   zDaskCommProtocol.__init__   s    
 	*-//
moo!"j6688 &*"r22)$*;<<
 " $%!'($$$r#   r   strc                    | j         rdS | j        J | j                            d          }|t          |d d          S dS )N<closed>sockname   	<unknown>	is_closedr8   get_extra_infor	   )rU   rZ   s     r+   
local_addrzDaskCommProtocol.local_addr   R    > 	:***?11*==$hrrl33{r#   c                    | j         rdS | j        J | j                            d          }|t          |d d          S dS )NrY   peernamer[   r\   r]   )rU   rc   s     r+   	peer_addrzDaskCommProtocol.peer_addr   ra   r#   c                    | j         d u S N)r8   rU   s    r+   r^   zDaskCommProtocol.is_closed   s    $&&r#   r!   c                f    | j         s)d | j        c| _        }|J |                                 d S d S rf   )r^   r8   abortrU   	transports     r+   _abortzDaskCommProtocol._abort   sH    ~ 	)-t&DOY(((OO	 	r#   	comm_reprc                    | j         sFt                              d| d           	 |                                  d S # t          $ r Y d S w xY wd S )NzClosing dangling comm ``)r^   loggerwarningrl   RuntimeError)rU   rm   s     r+   _close_from_finalizerz&DaskCommProtocol._close_from_finalizer   sl    ~ 	NNAYAAABBB   		 	s   = 
A
Ac                   K   | j         s'd | j        c| _        }|J |                                 | j         d {V  d S rf   )r^   r8   closer<   rj   s     r+   _closezDaskCommProtocol._close   s]      ~ 	)-t&DOY(((OO!!!!!!!!!!r#   rk   asyncio.BaseTransportc                   t          |          t          u rt          | |          | _        n#t	          |t
          j                  sJ || _        | j                            d           | j        |                     |            d S d S )Ni   )high)	typer   _ZeroCopyWriterr8   
isinstancerP   WriteTransportset_write_buffer_limitsr4   rj   s     r+   connection_madez DaskCommProtocol.connection_made   s    
 	??666-dI>>DOOi)?@@@@@'DO 	//Z/@@@)t$$$$$ *)r#   sizehintobjectc                    | j         | j        | j        k     rd| _        | j        | j        d         S d| _        | j        J | j         | j                 }|| j         d         S )z-Get a buffer to read into for this read eventNTF)rF   rH   r>   r=   r@   rB   rG   )rU   r   frames      r+   
get_bufferzDaskCommProtocol.get_buffer   sx     <4#<t?P#P#P)-D&'(9(;(;<<).D&$000L!23E$335566r#   nbytesc                    |dk    rd S | j         r&| xj        |z  c_        |                                  d S | xj        |z  c_        |                                 s|                                  d S d S Nr   )r=   rB   _parse_default_bufferrH   _frames_check_remaining_message_completed)rU   r   s     r+   buffer_updatedzDaskCommProtocol.buffer_updated   s    Q;;F% 	*'&&(((((%%/%%//11 *'')))))* *r#   c                   	 | j         |                                 snZ| j         J | j        J t          | j                  | j         k     r|                                 sn|                                 snv|                                  dS )z)Parse all messages in the default buffer.TN)rD   _parse_nframesrE   r$   _parse_frame_lengths_parse_frames_reset_default_bufferrg   s    r+   r   z&DaskCommProtocol._parse_default_buffer  s    
	}$**,, =,,,&2224&''$-770022 %%'' 
	 	""$$$$$r#   c                    | j         | j        z
  dk    rHt          j        d| j        | j        dz             d         | _        | xj        dz  c_        g | _        dS dS )zlFill in `_nframes` from the default buffer. Returns True if
        successful, False if more data is neededrM   z<Q   offsetr   TF)rB   rA   structunpack_fromr@   rD   rE   rg   s    r+   r   zDaskCommProtocol._parse_nframes  su     t22b88".d*43F3J  DM 2%"$D4ur#   c                   | j         J | j        J | j         t          | j                  z
  }| j        | j        z
  dz  }t          ||          }| j                            t          j        d| d| j	        | j                             | xj        d|z  z  c_        ||k    r:d | j        D             | _
        d| _        | j        r| j        d         nd| _        dS d	S )
zrFill in `_frame_lengths` from the default buffer. Returns True if
        successful, False if more data is neededNr   <Qr   c                ,    g | ]}t          |          S  )r   ).0ns     r+   
<listcomp>z9DaskCommProtocol._parse_frame_lengths.<locals>.<listcomp>2  s    GGGaJqMMGGGr#   r   TF)rD   rE   r$   rB   rA   minextendr   r   r@   rF   rG   rH   )rU   needed	availablen_reads       r+   r   z%DaskCommProtocol._parse_frame_lengths"  s    }((("...T%8!9!99&)<<B	Y''""Ft3D<O  	
 	
 	

 	q6z)VGG43FGGGDL !D*.*=D#A&&1 % 4ur#   c                    | j         rdS | j        J | j        J | j        J 	 | xj        dz  c_        | j        | j        k     r!| j        | j                 | _         | j         rdS ndS D)NTr    F)rH   rG   rD   rE   rg   s    r+   r   z(DaskCommProtocol._frames_check_remaining:  s    $ 	4 ,,,}((("...	" 4=00,0,?@Q,R),  4  u	r#   c                   	 | j         | j        z
  }|                                 s#|                                  t	          |          S |sdS | j        J | j        J | j        | j                 }t          | j        |          }| j	        | j        | j        |z            || j         || j        z
  pd<   | xj        |z  c_        | xj        |z  c_        )zkFill in `_frames` from the default buffer. Returns True if
        successful, False if more data is neededTFN)
rB   rA   r   r   r9   rF   rG   r   rH   r@   )rU   r   r   r   s       r+   r   zDaskCommProtocol._parse_framesL  s   	0)D,??I//11 ''')))I& u<+++$000L!23E2I>>F $T%84;NQW;W%WX **ft7P.P . 6)%%/%%'	0r#   c                    | j         }| j        }||k     r5|dk    r/| j        ||         | j        d||z
  <   d| _         ||z
  | _        dS ||k    rd| _         d| _        dS dS )z0Reset the default buffer for the next read eventr   N)rA   rB   r@   )rU   startends      r+   r   z&DaskCommProtocol._reset_default_bufferd  s    #3;;5A::262FuSy2QD 3;/"#D #eDc\\"#D !D \r#   c                    | j         J | j                             | j                   d| _        d| _        d| _        d| _        dS )zAPush a completed message to the queue and reset per-message stateNr   )r7   
put_nowaitrF   rD   rE   rI   rg   s    r+   r   z#DaskCommProtocol._message_completeds  sL    {&&&t|,,,"'($$$r#   excBaseException | Nonec                >   d | _         | j                            d            | j        J | j                            t
                     | j        rH| j        }|Ad | _        |                                s(|	                    t          d                     d S d S d S d S )NConnection closed)r8   r<   
set_resultr7   r   _COMM_CLOSEDr:   r;   doneset_exceptionr   )rU   r   waiters      r+   connection_lostz DaskCommProtocol.connection_lost|  s    &&t,,, {&&&|,,, < 	O'F!%)"{{}} O((9L)M)MNNNNN	O 	O!!O Or#   c                    d| _         d S )NT)r:   rg   s    r+   pause_writingzDaskCommProtocol.pause_writing  s    r#   c                    d| _         | j        }|2d | _        |                                s|                    d            d S d S d S )NF)r:   r;   r   r   )rU   r   s     r+   resume_writingzDaskCommProtocol.resume_writing  s[    #!%D;;== (!!$''''' ( (r#   r   c                   K   | j         1| j                                          d{V }|t          ur|S d| _         t          d          )z$Read a single message from the comm.Nr   )r7   getr   r   )rU   outs     r+   readzDaskCommProtocol.read  sY       ;"))))))))C,&&
DK1222r#   framesc                X  K   | j         rt          d          | j        r(| j                                        x}| _        | d{V  d |D             }t          |          }d |D             }t          |          |dz   dz  z   }t          j	        |dz    d||g|R  }|d	k     rd

                    |g|          g}nt          |g|          }| j        J t          |          dk    r| j                            |           n | j                            |d                    |S )zWrite a message to the comm.r   Nc                Z    g | ](}t          |t                    rt          |          n|)S r   )r|   r?   r   r   fs     r+   r   z*DaskCommProtocol.write.<locals>.<listcomp>  sB     
 
 
IJJq*$=$=Da   1
 
 
r#   c                ,    g | ]}t          |          S r   )r$   r   s     r+   r   z*DaskCommProtocol.write.<locals>.<listcomp>  s    000AQ000r#   r    r   r[   r   i   r#   r   )r^   r   r:   r6   rS   r;   r$   sumr   packr&   r0   r8   
writelineswrite)rU   r   drain_waiternframesframes_nbytes
msg_nbytesheaderr   s           r+   r   zDaskCommProtocol.write  su     > 	!"5666\ 	04
0H0H0J0JJL4-
 
NT
 
 
 f++00000
 ''7Q;!*;;
!...
GTmTTT  xx 1& 1223GG&'8'899G***w<<!O&&w////O!!'!*---r#   )NrJ   )r4   r3   rK   r   r   rW   r   r9   r-   )rm   rW   r   r!   )rk   rw   r   r!   )r   r   r   r?   )r   r   r   r!   rf   )r   r   r   r!   )r   r   )r   r   r   r   )__name__
__module____qualname____doc____annotations__tuple	__slots__rO   propertyr`   rd   r^   rl   rs   rv   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   __classcell__rV   s   @r+   r2   r2   V   s          =<<<$$$$    ????MMM((((""""$$$$$$$$    o&&I DH'$) $) $) $) $) $) $)L    X    X ' ' ' X'      " " " "% % % %"7 7 7 7
* 
* 
* 
*% % % %       0   $0 0 0 00" " " ") ) ) )O O O O O    ( ( ( (	3 	3 	3 	3# # # # # # # #r#   r2   c                       e Zd Zej                            ej                            d                    Z	 dd fd
Z	ddZ
edd            Zedd            ZddZddZddZddZd dZed             Z xZS )!TCPzdistributed.comm.shardTprotocolr2   r`   rW   rd   deserializer9   c                2   || _         || _        || _        d| _        t	                                          |           t          j        | | j         j        t          |                     | _
        d| j
        _        |                                 | _        d S )NFr   )	_protocol_local_addr
_peer_addr_closedrN   rO   weakreffinalizers   repr
_finalizeratexit_get_extra_info_extra_info)rU   r   r`   rd   r   rV   s        r+   rO   zTCP.__init__  s     "%#[111 "*$.6T


 
 "'  //11r#   r   dict[str, Any]c                    i S rf   r   rg   s    r+   r   zTCP._get_extra_info  s    	r#   c                    | j         S rf   )r   rg   s    r+   local_addresszTCP.local_address      r#   c                    | j         S rf   )r   rg   s    r+   peer_addresszTCP.peer_address  s
    r#   Nc                   K   | j                                          d {V }	 t          || j        || j                   d {V S # t
          $ r$ |                                  t          d          w xY w)N)r   deserializersallow_offloadz aborted stream on truncated data)r   r   r   r   r   EOFErrorri   r   )rU   r   r   s      r+   r   zTCP.read  s      ~**,,,,,,,,
	F$ ,+"0	           	F 	F 	FJJLLL!"DEEE	Fs   "A .A4messagec           	        K   t          || j        ||| j        | j        d| j        | j                   d {V }| j                            |           d {V }|S )N)sender	recipient)r   serializerson_errorcontextframe_split_size)r   r   
local_inforemote_infohandshake_optionsmax_shard_sizer   r   )rU   msgr   r   r   r   s         r+   r   z	TCP.write  s       ,#/!-  (
 "0
 
 
 
 
 
 
 
 
 ~++F33333333r#   r!   c                z   K   | j                                          d{V  | j                                         dS )zFlush and close the commN)r   rv   r   detachrg   s    r+   ru   z	TCP.close  sF      n##%%%%%%%%%     r#   c                j    | j                                          | j                                         dS )zHard close the commN)r   rl   r   r  rg   s    r+   ri   z	TCP.abort  s0         r#   c                    | j         j        S rf   )r   r^   rg   s    r+   closedz
TCP.closed  s    ~''r#   c                    | j         S rf   )r   rg   s    r+   
extra_infozTCP.extra_info  r   r#   T)r   r2   r`   rW   rd   rW   r   r9   r   r   r   rf   )Nr   r-   r   )r   r   r   daskutilsparse_bytesconfigr   r   rO   r   r   r   r   r   r   ru   ri   r  r  r   r   s   @r+   r   r     sB       Z++DKOO<T,U,UVVN !2 2 2 2 2 2 2,          X     XF F F F    ! ! ! !
! ! ! !
( ( ( (     X         r#   r   c                      e Zd ZddZdS )TLSr   r   c                n    | j         j        J | j         j        j        } |d           |d          dS )Npeercertcipher)r  r  )r   r8   r_   )rU   r   s     r+   r   zTLS._get_extra_info  s>    ~(444n'6C
OOss8}}EEEr#   Nr	  )r   r   r   r   r   r#   r+   r  r    s.        F F F F F Fr#   r  c                    |                      d          }t          |t          j                  st	          d|          |S )Nssl_contextzpTLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got )r   r|   ssl
SSLContext	TypeError)connection_argsctxs     r+   _expect_tls_contextr  $  sT    


m
,
,Cc3>** 
$$ $
 
 	

 Jr#   addressrW   kwargsr   r!   c                Z    |                     d          rt          dd| z             d S )Nrequire_encryptionzJencryption required by Dask configuration, refusing communication from/to tcp://)r   rr   )r  r  s     r+   _error_if_require_encryptionr   /  sF    zz&'' 
l3;g3E3EH
 
 	

 
r#   c                  &    e Zd ZdZeZddZdd
ZdS )TCPConnectorr  Tc                  K   t          j                    }t          |          \  }} | j        |fi |} |j        t
          ||fi | d {V \  }}| j        |j        z   }	| j        |z   }
|                     ||	|
|          S )Nr   )	rP   rQ   r   _get_extra_kwargscreate_connectionr2   prefixr`   
comm_class)rU   r  r   r  loopipportrk   r   r`   rd   s              r+   connectzTCPConnector.connect;  s      '))"7++D''::6::$:D$:b$%
 %
*0%
 %
 
 
 
 
 
 
	8 [8#66
K')	xYKXXXr#   r  rW   r  r   r   r   c                     t          |fi | i S rf   r   rU   r  r  s      r+   r$  zTCPConnector._get_extra_kwargsG      $W77777	r#   Nr  r  rW   r  r   r   r   )r   r   r   r&  r   r'  r+  r$  r   r#   r+   r"  r"  7  sJ        FJ
Y 
Y 
Y 
Y     r#   r"  c                      e Zd ZdZeZd
dZd	S )TLSConnectortls://r  rW   r  r   r   r   c                (    t          |          }d|iS Nr  r  rU   r  r  r  s       r+   r$  zTLSConnector._get_extra_kwargsP      !&))s|r#   Nr0  r   r   r   r&  r  r'  r$  r   r#   r+   r2  r2  L  2        FJ     r#   r2  c                      e Zd ZdZeZ	 	 	 	 ddZddZd Zd Z	d Z
ddZddZd Zedd            Zedd            ZdS )TCPListenerr  TNr   c                    t          ||          \  | _        | _        || _        || _        || _        || _         | j        |fi || _        d | _	        d S rf   )
r   r)  r*  default_hostcomm_handlerr   r   r$  _extra_kwargsbound_address)rU   r  r?  r   r   r>  default_portr  s           r+   rO   zTCPListener.__init__Y  sg     -WlCC((&*3T3GFFvFF!r#   r  rW   r  r   r   r   c                     t          |fi | i S rf   r-  r.  s      r+   r$  zTCPListener._get_extra_kwargsk  r/  r#   c                    |                      || j        |j        z   | j        |j        z   | j                  }| j        |_        t          j        |                     |                     d S )N)r`   rd   r   )	r'  r&  r`   rd   r   r   rP   ensure_future_comm_handler)rU   r   comms      r+   _on_connectionzTCPListener._on_connectiono  so    {X%88kH$66(	  
 
 "/d006677777r#   c                   K   	 |                      |           d {V  n+# t          $ r t                              d           Y d S w xY w|                     |           d {V  d S )Nz,Connection closed before handshake completed)r4   r   rp   debugr?  )rU   rG  s     r+   rF  zTCPListener._comm_handlery  s      	$$T********** 	 	 	LLGHHHFF	 %%%%%%%%%%%s     $AAc           
        K   t          j                    }|                    ddt          j        t          j        t          j        d           d{V }t          |d           }g }d}	 |D ]Q}|\  }}}}	}
	 t          j        |||          }n# t          $ r Y /w xY w|t          t          dd          k    r@t          t          d          r+|                    t          j        t          j        d           ||
d         |g|
d	d         R }
	 |                    |
           nE# t          $ r8}t          |j        d
|
d|j                                                  dd}~ww xY w||                                d         } |j         fdfd|i j         d{V }|                    |           d}Sn># t,          $ r1 |D ]}|                                 ||                                  w xY w|S )a8  Due to a design decision in asyncio, listening on `("", 0)` will
        result in two different random ports being used (one for IPV4, one for
        IPV6), rather than both interfaces sharing the same random port. We
        work around this here. See https://bugs.python.org/issue45693 for more
        info.Nr   )familyrz   flagsprotoc                    | d         j         S r   )name)xs    r+   <lambda>zDTCPListener._start_all_interfaces_with_random_port.<locals>.<lambda>  s    AaDI r#   )keyAF_INET6IPPROTO_IPV6Tr[   z*error while attempting to bind on address z: r    c                 ,    t           j                  S rf   r2   rH  rg   s   r+   rR  zDTCPListener._start_all_interfaces_with_random_port.<locals>.<lambda>      ,T-@AA r#   sock)rP   rQ   getaddrinfosocket	AF_UNSPECSOCK_STREAM
AI_PASSIVEsortedOSErrorgetattrhasattr
setsockoptrU  IPV6_V6ONLYbinderrnostrerrorlowergetsocknamecreate_serverr@  r%   BaseExceptionru   )rU   r(  infosserversr*  resafsocktyperN  	canonnamesarY  errservers   `             r+   &_start_all_interfaces_with_random_portz2TCPListener._start_all_interfaces_with_random_port  s      '))&&### ' 
 
 
 
 
 
 
 
 u"5"5666 1	 ' '582HeY!=Xu==DD   H T::::wN@ @: OOF$79KTRRR #Q%/122//B IIbMMMM      !		79rr3<;M;M;O;O;OQ   	   <++--a0D  2t1AAAA     (         
 v&&&O'P  	 	 	!   

	 sV   -G ;BG 
BG BA4G D*)G *
E,43E''E,,AG ;G>r!   c                    K   t          j                    } j        s" j        s                                  d {V }n* |j         fdf j         j        d j         d {V g}| _        d S )Nc                 ,    t           j                  S rf   rW  rg   s   r+   rR  z#TCPListener.start.<locals>.<lambda>  rX  r#   )hostr*  )rP   rQ   r)  r*  ru  rj  r@  _servers)rU   r(  rm  s   `  r+   r   zTCPListener.start  s      '))w 
	ty 
	 GGIIIIIIIIGG )d(AAAA  (	       G  r#   c                B    | j         D ]}|                                 d S rf   )ry  ru   )rU   rt  s     r+   stopzTCPListener.stop  s,    m 	 	FLLNNNN	 	r#   c                    | j         :d } || j        d                   }|                                dd         | _         | j         S )z@
        The listening address as a (host, port) tuple.
        Nc                    t           j        t           j        fD ]}| j        D ]}|j        |k    r|c c S t          d          )NzNo active INET socket found?)r[  AF_INETrT  socketsrL  rr   )rt  rL  rY  s      r+   
get_socketz-TCPListener.get_host_port.<locals>.get_socket  s`    %~v? ( (F & ( (;&00#'KKKKK 1( ##ABBBr#   r   r[   )rA  ry  ri  )rU   r  rY  s      r+   get_host_portzTCPListener.get_host_port  s]     %C C C :dmA.//D!%!1!1!3!3BQB!7D!!r#   c                H    | j         t          |                                  z   S )z4
        The listening address as a string.
        )r&  r	   r  rg   s    r+   listen_addresszTCPListener.listen_address  s#    
 {.0B0B0D0DEEEr#   c                    |                                  \  }}t          || j                  }| j        t	          ||          z   S )z2
        The contact address as a string.
        )r>  )r  r   r>  r&  r	   )rU   rx  r*  s      r+   contact_addresszTCPListener.contact_address  sF    
 ''))
d#Dt7HIII{.tT::::r#   )TTNr   r0  r-   r   )r   r   r   r&  r   r'  rO   r$  rH  rF  ru  r   r{  r  r   r  r  r   r#   r+   r<  r<  U  s        FJ " " " "$   8 8 8& & &J J JX          
" " "" F F F XF ; ; ; X; ; ;r#   r<  c                      e Zd ZdZeZd
dZd	S )TLSListenerr3  r  rW   r  r   r   r   c                (    t          |          }d|iS r5  r6  r7  s       r+   r$  zTLSListener._get_extra_kwargs  r8  r#   Nr0  r9  r   r#   r+   r  r    r:  r#   r  c                  <    e Zd ZeZeZd
dZd Zd Z	d Z
d Zd Zd	S )
TCPBackendr   r   c                *    |                                  S rf   )_connector_classrg   s    r+   get_connectorzTCPBackend.get_connector  s    $$&&&r#   c                "     | j         |||fi |S rf   )_listener_class)rU   lochandle_commr   r  s        r+   get_listenerzTCPBackend.get_listener  s!    #t#CkUU_UUUr#   c                ,    t          |          d         S r   r   rU   r  s     r+   get_address_hostzTCPBackend.get_address_host  s    s##A&&r#   c                     t          |          S rf   r  r  s     r+   get_address_host_portz TCPBackend.get_address_host_port  s    s###r#   c                `    t          |          \  }}t          t          |          |          S rf   )r   r	   r   )rU   r  rx  r*  s       r+   resolve_addresszTCPBackend.resolve_address  s)    $S))
d 4$777r#   c                    t          |          \  }}t          |          }d|v rt          |          }nt          |          }t	          |d           S )N:)r   r   r   r   r	   )rU   r  rx  r*  
local_hosts        r+   get_local_address_forz TCPBackend.get_local_address_for   sO    $S))
d$;;!$JJJ T222r#   N)r   r   )r   r   r   r"  r  r<  r  r  r  r  r  r  r  r   r#   r+   r  r    s        #!O' ' ' 'V V V' ' '$ $ $8 8 83 3 3 3 3r#   r  c                      e Zd ZeZeZdS )
TLSBackendN)r   r   r   r2  r  r  r  r   r#   r+   r  r  *  s        #!OOOr#   r  c                  N   e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<    eej        d          r-ej        dk    rdZ	n!	  e
j        d          Z	n# e$ r dZ	Y nw xY wdZ	d1dZ	 	 d2d3dZd4dZd4dZd4dZd5d Zd6d"Zd7d$Zd5d%Zd8d(Zd4d)Zd4d*Zd9d.Zd4d/Zd4d0ZdS ):r{   zThe builtin socket transport in asyncio makes a bunch of copies, which
    can make sending large amounts of data much slower. This hacks around that.

    Note that this workaround isn't used with the windows ProactorEventLoop or
    uvloop.r2   r   r   rk   r5   r6   zcollections.deque[memoryview]_buffersr   _sizer9   _protocol_pausedsendmsgwin32rM   
SC_IOV_MAXr    c                ,   || _         || _        t          j                    | _        dD ]}t          ||          sJ dD ]}t          | j        |          sJ t          j                    | _        d| _	        d| _
        |                                  d S )N)_sock_sock_fd_fatal_error_eof_closing
_conn_lost_call_connection_lost)_add_writer_remove_writerr   F)r   rk   rP   rQ   r6   rb  collectionsdequer  r  r  r~   )rU   r   rk   attrs       r+   rO   z_ZeroCopyWriter.__init__S  s     "-//

 		, 		,D 9d++++++ 6 	- 	-D4:t,,,,,, $)++
 %$$&&&&&r#   Nry   rC   lowr   r!   c                p    |
|d}nd|z  }||dz  }|| _         || _        |                                  dS )zSet the write buffer limitsNr      )_high_water
_low_water_maybe_pause_protocol)rU   ry   r  s      r+   r~   z'_ZeroCopyWriter.set_write_buffer_limitsr  sS     <{ 3w;!)C""$$$$$r#   c                |    | j         s2| j        | j        k    r$d| _         | j                                         dS dS dS )z;If the high water mark has been reached, pause the protocolTN)r  r  r  r   r   rg   s    r+   r  z%_ZeroCopyWriter._maybe_pause_protocol  sN    $ 	*d6F)F)F$(D!M'')))))	* 	*)F)Fr#   c                |    | j         r2| j        | j        k    r$d| _         | j                                         dS dS dS )z<If the low water mark has been reached, unpause the protocolFN)r  r  r  r   r   rg   s    r+   _maybe_resume_protocolz&_ZeroCopyWriter._maybe_resume_protocol  sM      	+TZ4?%B%B$)D!M((*****	+ 	+%B%Br#   c                F    | j                                          d| _        dS )zClear the send bufferr   N)r  r'   r  rg   s    r+   _buffer_clearz_ZeroCopyWriter._buffer_clear  s!    


r#   databytesc                    t          |          }| xj        t          |          z  c_        | j                            |           dS )z"Append new data to the send bufferN)r   r  r$   r  r%   )rU   r  mvs      r+   _buffer_appendz_ZeroCopyWriter._buffer_append  sA    t$$

c"gg

R     r#   list[memoryview]c                P    t          t          | j        | j                            S )z.Get one or more buffers to write to the socket)listr   r  SENDMSG_MAX_COUNTrg   s    r+   _buffer_peekz_ZeroCopyWriter._buffer_peek  s    F4=$*@AABBBr#   r/   c                    | xj         |z  c_         | j        }|rJ|d         }t          |          }||k    r|                                 ||z  }n||d         |d<   dS |HdS dS )z*Advance the buffer index forward by `size`r   N)r  r  r$   popleft)rU   r/   r   r.   b_lens        r+   _buffer_advancez_ZeroCopyWriter._buffer_advance  s    

d

- 	
AFFE}}!!!tuuX
  	 	 	 	 	r#   c                   | j         }|j        rt          d          |sd S |j        rd S | j        s	 |j                            |          }||d          }|sd S nQ# t          t          f$ r Y n>t          t          f$ r  t          $ r!}|                    |d           Y d }~d S d }~ww xY w| j                            |j        | j                   |                     |           |                                  d S )Nz%Cannot call write() after write_eof()%Fatal write error on socket transport)rk   r  rr   r  r  r  sendBlockingIOErrorInterruptedError
SystemExitKeyboardInterruptrk  r  r6   r  r  _on_write_readyr  r  )rU   r  rk   r   r   s        r+   r   z_ZeroCopyWriter.write  sQ   N	> 	HFGGG 	F 	F} 	MO((.. ABBx F $%56    12       &&s,STTT J""9#5t7KLLL 	D!!!""$$$$$s   A B*/B*	B%%B*r   r   c                   t          | j                  }|D ]}|                     |           |s	 |                                  nV# t          t
          f$ r Y nCt          t          f$ r  t          $ r&}| j	        
                    |d           Y d }~d S d }~ww xY w| j        sd S | j                            | j	        j        | j                   |                                  d S Nr  )r9   r  r  _do_bulk_writer  r  r  r  rk  rk   r  r6   r  r  r  r  )rU   r   waitingr.   r   s        r+   r   z_ZeroCopyWriter.writelines  s+   t}%% 	# 	#A"""" 	R
##%%%%#%56    12       ++@   	
 =  J""4>#:D<PQQQ""$$$$$s   A BB4BBc                \    |                                   | j                                        S rf   )r  rk   ru   rg   s    r+   ru   z_ZeroCopyWriter.close  (    ~##%%%r#   c                \    |                                   | j                                        S rf   )r  rk   ri   rg   s    r+   ri   z_ZeroCopyWriter.abort  r  r#   rS  rW   r   c                6    | j                             |          S rf   )rk   r_   )rU   rS  s     r+   r_   z_ZeroCopyWriter.get_extra_info  s    ~,,S111r#   c                   |                                  }t          |          dk    r&| j        j                            |d                   }n| j        j                            |          }|                     |           d S )Nr    r   )r  r$   rk   r  r  r  r  )rU   r   r   s      r+   r  z_ZeroCopyWriter._do_bulk_write  st    ##%%w<<1$))'!*55AA$,,W55AQr#   c                r   | j         }|j        rd S 	 |                                  |                                  | j        sj| j                            |j                   |j        r|	                    d            d S |j
        r(|j                            t          j                   d S d S d S # t          t           f$ r Y d S t"          t$          f$ r  t&          $ rY}| j                            |j                   | j                                         |                    |d           Y d }~d S d }~ww xY wr  )rk   r  r  r  r  r6   r  r  r  r  r  r  shutdownr[  SHUT_WRr  r  r  r  rk  r'   r  )rU   rk   r   s      r+   r  z_ZeroCopyWriter._on_write_ready  s}   N	 	F	=!!! '')))= =
)))*<===% =33D99999^ =O,,V^<<<<<= =
= =!  !12 	 	 	DD-. 	 	 	 	Q 	Q 	QJ%%i&8999M!!!""3(OPPPPPPPPP		Qs   B/ /D6D6AD11D6)r   r2   rk   r   )NN)ry   rC   r  rC   r   r!   r-   )r  r  r   r!   )r   r  )r/   r   r   r!   )r   r   r   r!   )rS  rW   r   r   )r   r   r   r   r   rb  r[  sysplatformr  ossysconf	ExceptionrO   r~   r  r  r  r  r  r  r   r   ru   ri   r_   r  r  r   r#   r+   r{   r{   1  s          ''''$$$$++++JJJ wv}i(( 
<7"" "'$.BJ|$<$<!! ' ' '$&!!!' ' ' ' 'B  % % % % %$* * * *+ + + +   
! ! ! !C C C C   % % % %B% % % %2& & & && & & &2 2 2 2       = = = = = =s   A A! A!r{   )r   r   )r   r   r   r   r   r   r   r   )r  rW   r  r   r   r!   );
__future__r   rP   r  loggingr  r[  r  r   r  r   asyncio.selector_eventsr   collections.abcr   	itertoolsr   typingr   r
  distributed.comm.addressingr   r	   distributed.comm.corer
   r   r   r   distributed.comm.registryr   distributed.comm.utilsr   r   r   r   distributed.utilsr   r   r   r   	getLoggerr   rp   r   r   r0   BufferedProtocolr2   r   r  r  r   r"  r2  r<  r  r  r  r{   r   r#   r+   <module>r     s   " " " " " "       				  



  



  < < < < < < $ $ $ $ $ $              J J J J J J J J L L L L L L L L L L L L - - - - - -            M L L L L L L L L L L L		8	$	$ vxx
 (!/ / / / /dp p p p pw/ p p pfQ  Q  Q  Q  Q $ Q  Q  Q hF F F F F# F F F  
 
 
 
    9   *    <   k; k; k; k; k;( k; k; k;\    +   3 3 3 3 3 3 3 3<" " " " " " " "b= b= b= b= b= b= b= b= b= b=r#   