
    ∋dY                       d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	m
Z
mZ ddlmZmZ ddlmZ ddlZddlmZ ddlmZmZ dd	lmZmZmZmZ dd
lmZmZ ddlm Z m!Z!m"Z"m#Z# ddl$m%Z%m&Z&m'Z' ddl(m)Z)m*Z*m+Z+m,Z,m-Z-  ej.        e/          Z0er	 ddl1a1n# e2$ r Y nw xY wda1da3da4da5dZ6d)dZ7d*dZ8d+dZ9d,dZ:d Z;d Z< G d d e          Z= G d! d"e          Z> G d# d$e          Z? G d% d&e          Z@ e@            ed'<   d( ZAdS )-z
:ref:`UCX`_ based communications for distributed.

See :ref:`communications` for more.

.. _UCX: https://github.com/openucx/ucx
    )annotationsN)	AwaitableCallable
Collection)TYPE_CHECKINGAny)patch)parse_bytes)parse_host_portunparse_host_port)CommCommClosedError	ConnectorListener)Backendbackends)ensure_concrete_hostfrom_frames
host_array	to_frames)CudaDeviceInfoget_device_index_and_uuidhas_cuda_context)	ensure_ipget_ipget_ipv6
log_errorsnbytesFzThis is often the result of a CUDA-enabled library calling a CUDA runtime function before Dask-CUDA can spawn worker processes. Please make sure any such function calls don't happen at import time or in the global scope of a program.device_infor   returnstrc                @    | j          dt          | j                   dS )Nz ())device_indexr!   uuid)r   s    4lib/python3.11/site-packages/distributed/comm/ucx.py_get_device_and_uuid_strr'   @   s'    &BB#k.>*?*?BBBB    pidintNonec                t    t          |           }t                              d| d| dt                      d S )NzA CUDA context for device z already exists on process ID . r'   loggerwarning_warning_suffix)r   r)   device_uuid_strs      r&   _warn_existing_cuda_contextr3   D   sZ    .{;;O
NN	2_ 	2 	2	2 	2 /	2 	2    r(   device_info_expecteddevice_info_actualc           
         t          |           }t          |          }t                              d| d| d| dt                      d S )NzWorker with process ID z/ should have a CUDA context assigned to device z,, but instead the CUDA context is on device r-   r.   )r4   r5   r)   expected_device_uuid_stractual_device_uuid_strs        r&   _warn_cuda_context_wrong_devicer9   L   sz      88LMM56HII
NN	7# 	7 	7#	7 	7!	7 	7%4	7 	7    r(   c                    dd l }|j                                        }|j        j        j                            |           }|j        j                            ||d           } |                                  d S Nr   )
numba.cudacudacurrent_contextdriverdrvapi	cu_streamStreamsynchronize)streamnumbactxrA   s       r&   synchronize_streamrG   X   sp    
*
$
$
&
&C
!(226::IZ%%c9d;;F
r(   c            
     `  	 t           d S t                      \  } }t          j                            d|                     d|                    dd                              }t
          j                            d          du s
d|v r,d|vr'	 dd ln# t          $ r t          d	          w xY wt          t          j                            d
d          
                    d          d                   }t                      at          j        r+t          t          j        t          j                               j                                         t                      at&          j        rFt&          j        j        |j        k    r,t+          |t&          j        t          j                               dd l }|a t-          j        t          j        |          5  t          j        | d           d d d            n# 1 swxY w Y   t
          j                            d          }	 dd l		fda|)t7          |          }	                    dd|           d S d S # t          $ rC 	 dd lfd}|an# t          $ r d aY nw xY w|t:                              d           Y d S Y d S w xY w)NUCX_TLSTLS z(distributed.comm.ucx.create-cuda-contextTr=   z^cudar   z;CUDA support with UCX requires Numba for context managementCUDA_VISIBLE_DEVICES0,)optionsenv_takes_precedencezdistributed.rmm.pool-sizec                0                         |           S )N)size)DeviceBuffer)nrmms    r&   device_arrayrV      s    ###+++r(   F)pool_allocatormanaged_memoryinitial_pool_sizec                    j                             | fd          }t          j        |j         j                   |S )Nu1)dtype)r=   rV   weakreffinalizer>   )rT   arE   s     r&   numba_device_arrayz%init_once.<locals>.numba_device_array   s<    J++QD+== EJ$>???r(   c                     t          d          )Nz;In order to send/recv CUDA arrays, Numba or RMM is required)RuntimeError)rT   s    r&   rV   rV      s    "Q  r(   zyInitial RMM pool size defined, but RMM is not available. Please consider installing RMM or removing the pool size option.)ucp_prepare_ucx_configosenvirongetdaskconfigr<   ImportErrorr   splitr   pre_existing_cuda_contexthas_contextr3   r   getpidr=   r>   cuda_context_createdr%   r9   r	   dictinitrU   rV   r
   reinitializer/   r0   )

ucx_configucx_environmentucx_tlscuda_visible_device_ucppool_size_str	pool_sizer`   rE   rU   s
           @@r&   	init_oncerz   a   s   
  #6"7"7J jnnuo11)R@@AA G
 	BCCtKK g'"8"8	 	 	 	M  	
 8JNN1377==cBB1E
 
 %5$6$6!$0 	')5ry{{   	
""$$$/11 ,	$059L9QQQ+#%9%Ery{{   
C	BJ	0	0 @ @
 	$????@ @ @ @ @ @ @ @ @ @ @ @ @ @ @ KOO$?@@M!


	, 	, 	, 	, 	, $#M22I#EY       %$
    	    
 .LL 	 	 	    	 $NNS      %$$%sT   B B0G??HH*2I   
J-+I76J-7JJ-JJ-,J-c                0     |             }|	d|_         dS dS )zCallback to close Dask Comm when UCX Endpoint closes or errors

    Parameters
    ----------
        ref: weak reference to a Dask UCX comm
    NT)_closed)refcomms     r&   _close_commr      s(     355D r(   c                       e Zd ZdZ	 dd fdZedd
            Zedd            Zed fd            Ze		 	 d d!d            Z
e	d"d            Zd Zd Zed             Zd Z xZS )#UCXa  Comm object using UCP.

    Parameters
    ----------
    ep : ucp.Endpoint
        The UCP endpoint.
    address : str
        The address, prefixed with `ucx://` to use.
    deserialize : bool, default True
        Whether to deserialize data in :meth:`distributed.protocol.loads`

    Notes
    -----
    The read-write cycle uses the following pattern:

    Each msg is serialized into a number of "data" frames. We prepend these
    real frames with two additional frames

        1. is_gpus: Boolean indicator for whether the frame should be
           received into GPU memory. Packed in '?' format. Unpack with
           ``<n_frames>?`` format.
        2. frame_size : Unsigned int describing the size of frame (in bytes)
           to receive. Packed in 'Q' format, so a length-0 frame is equivalent
           to an unsized frame. Unpacked with ``<n_frames>Q``.

    The expected read cycle is

    1. Read the frame describing if connection is closing and number of frames
    2. Read the frame describing whether each data frame is gpu-bound
    3. Read the frame describing whether each data frame is sized
    4. Read all the data frames.
    T
local_addrr!   	peer_addrdeserializeboolc                   t                                          |           || _        |r|                    d          sJ |                    d          sJ || _        || _        d | _        t          | j        d          rUt          j	        |           }| j        
                    t          j        t          |                     d| _        d| _        nd| _        t                               d|            d S )N)r   ucxset_close_callbackFTzUCX.__init__ %s)super__init___ep
startswith_local_addr
_peer_addr	comm_flaghasattrr]   r}   r   	functoolspartialr   r|   _has_close_callbackr/   debug)selfepr   r   r   r}   	__class__s         r&   r   zUCX.__init__   s     	[111 	0((/////##E*****%# 48122 	-+d##CH''	(9+s(K(KLLL DL'+D$$',D$&-----r(   r    c                    | j         S N)r   r   s    r&   local_addresszUCX.local_address  s    r(   c                    | j         S r   )r   r   s    r&   peer_addresszUCX.peer_address  s
    r(   c                <    | j         rt                      j        ndS )z)Unlike in TCP, local_address can be blankF)r   r   	same_host)r   r   s    r&   r   zUCX.same_host  s     %)$4?uww  %?r(   Nmessagemsgrp   serializersCollection[str] | Noneon_errorr*   c                T  K   |                                  rt          d          |d}t          |||| j                   d {V }t	          |          }t          d |D                       }t          d |D                       }t          d t          ||          D              \  }}		 | j                            t          j
        dd|                     d {V  | j                            t          j
        |d	z  |d
z  z   g||R             d {V  t          |          rt          d           |	D ]"}
| j                            |
           d {V  #t          |          S # t          j        j        $ r$ |                                  t          d          w xY w)Nz,Endpoint is closed -- unable to send messager=   rh   pickleerror)r   r   allow_offloadc              3  6   K   | ]}t          |d           V  dS )__cuda_array_interface__N)r   .0fs     r&   	<genexpr>zUCX.write.<locals>.<genexpr>2  s-      SSqGA'ABBSSSSSSr(   c              3  4   K   | ]}t          |          V  d S r   r   r   s     r&   r   zUCX.write.<locals>.<genexpr>3  s(      00AfQii000000r(   c              3  J   K   | ]\  }}t          |          d k    ||fV  dS r   Nr   r   is_cuda
each_frames      r&   r   zUCX.write.<locals>.<genexpr>5  sJ        'GZ*%%)) *%)))) r(   ?QF?Qr   z(While writing, the connection was closed)closedr   r   r   lentuplezipr   sendstructpackanyrG   sumrc   
exceptionsUCXBaseExceptionabort)r   r   r   r   framesnframescuda_framessizescuda_send_framessend_framesr   s              r&   writez	UCX.write  s/      ;;== 	R!"PQQQ=K #,	
 
 
 
 
 
 
 
 
 f++SSFSSSSS0000000(+ +.{F+C+C  )
%+	N ',,v{4@@AAAAAAAAA ',,GcMGcM9PKP%PPP         #$$ &"1%%%) / /
gll:..........u::~. 	N 	N 	NJJLLL!"LMMM	Ns   .C E/ /8F'r   c                d  K   |d}	 t          t          j        d                    }| j                            |           d {V  t          j        d|          \  }}|rt          d          |dz  |dz  z   }t          t          j        |                    }| j                            |           d {V  t          j        ||          }|d |         ||d          }}d t          ||          D             }	t          d t          ||	          D              \  }
}t          |
          rt          d           	 |D ]"}| j                            |           d {V  #n8# t          $ r+}|                                  t          d	|          d }~ww xY w	 t          |	| j        || j        
           d {V }n1# t          $ r$ |                                  t          d          w xY w|S # t          $ r+}|                                  t          d	|          d }~ww xY w)Nr   r   zConnection closed by writerr   r   c                T    g | ]%\  }}|rt          |          nt          |          &S  )rV   r   )r   r   	each_sizes      r&   
<listcomp>zUCX.read.<locals>.<listcomp>y  sF       &GY ,3MY'''
98M8M  r(   c              3  J   K   | ]\  }}t          |          d k    ||fV  dS r   r   r   s      r&   r   zUCX.read.<locals>.<genexpr>~  sJ        +j))A-- j)---- r(   r   z.Connection closed by writer.
Inner exception: )r   deserializersr   z Aborted stream on truncated data)r   r   calcsizer   recvunpackr   r   r   rG   BaseExceptionr   r   r   r   EOFError)r   r   r   shutdownr   
header_fmtheaderr   r   r   cuda_recv_framesrecv_framesr   es                 r&   readzUCX.readX  s      ?MD	 V_T2233C',,s#########"(-c":":Xw E%&CDDD !336J
 ; ;<<F',,v&&&&&&&&&]:v66F!'!16'((3CK *-k5*A*A  F -0 /2;/G/G  -)k #$$ &"1%%%
"- 3 3J',,z22222222223     

%KaKK  
J' $ 0"/"&"4	          J J J

%&HIIIJ Je  	 	 	 JJLLL!G!GG  	s<   CG: %E* *
F4&FF##G .G5:
H/&H**H/c                |  K   d| _         | j        	 | j                            t	          j        ddd                     d {V  nX# t          j        j        t          j        j	        t          j        j
        ft          t          j        dd          fz   $ r Y nw xY w|                                  d | _        d S d S )NTr   r   UCXConnectionResetr   )r|   r   r   r   r   r   rc   r   UCXErrorUCXCloseErrorUCXCanceledgetattrr   r   s    r&   closez	UCX.close  s      8
gll6;tT1#=#=>>>>>>>>>>',* )=rBBD	E     JJLLLDHHH  s   4A ABBc                f    d| _         | j        "| j                                         d | _        d S d S NT)r|   r   r   r   s    r&   r   z	UCX.abort  s6    8HNNDHHH  r(   c                <    | j         | j         S t          d          )NzUCX Endpoint is closed)r   r   r   s    r&   r   zUCX.ep  s     88O!":;;;r(   c                4    | j         du r| j        S | j        d u S r   )r   r|   r   r   s    r&   r   z
UCX.closed  s'    #t++ <8t##r(   T)r   r!   r   r!   r   r   )r    r!   )r    r   )Nr   )r   rp   r   r   r   r!   r    r*   )r   )__name__
__module____qualname____doc__r   propertyr   r   r   r   r   r   r   r   r   r   __classcell__)r   s   @r&   r   r      sd        D HL. . . . . . .0       X     X @ @ @ @ @ X@  /3!	7N 7N 7N 7N Z7Nr H H H ZHT  "   < < X<$ $ $ $ $ $ $r(   r   c                  &    e Zd ZdZeZdZ	 dddZdS )UCXConnectorucx://FTaddressr!   r   r   connection_argsr   r    r   c                F  K   t                               d|           t          |          \  }}t                       	 t	          j        ||           d {V }n'# t          j        j        $ r t          d          w xY w| 	                    |d| j
        |z   |          S )NzUCXConnector.connect: %s,Connection closed before handshake completedrK   r   r   r   )r/   r   r   rz   rc   create_endpointr   r   r   
comm_classprefix)r   r   r   r   ipportr   s          r&   connectzUCXConnector.connect  s       	/999"7++D	R*2t44444444BB~. 	R 	R 	R!"PQQQ	RkG+#	  
 
 	
s   A $A?Nr   )r   r!   r   r   r   r   r    r   )r   r   r   r   r   r   	encryptedr   r   r(   r&   r   r     sA        FJI 15
 
 
 
 
 
 
r(   r   c                      e Zd Zej        Zej        Zej        Z	 	 	 dddZed             Z	ed             Z
d Zd Zd Zed             Zed             Zed             ZdS )UCXListenerNFTr   r!   comm_handler'Callable[[UCX], Awaitable[None]] | Noner   r   r   r   r   c                    |                     d          sd|z   }t          |d          \  | _        | _        || _        || _        || _        d | _        d | _        || _	        d S )Nr   r   r   )default_port)
r   r   r   _input_portr  r   r   r   
ucp_serverr   )r   r   r  r   r   r   s         r&   r   zUCXListener.__init__  st     !!%(( 	)(G$3G!$L$L$L!!(&*.r(   c                    | j         j        S r   )r  r   r   s    r&   r   zUCXListener.port  s    ##r(   c                F    d| j         z   dz   t          | j                  z   S )Nr   :)r   r!   r   r   s    r&   r   zUCXListener.address  s!    $'!C'#di..88r(   c                r    K    fd}t                       t          j        | j                   _        d S )Nc                @  K   t          | j        j        j                  }j        |_        	                     |           d {V  n+# t
          $ r t                              d           Y d S w xY wj        r                    |           d {V  d S d S )Nr   r   )	r   r   r   r   on_connectionr   r/   r   r  )	client_epr   r   s     r&   serve_foreverz(UCXListener.start.<locals>.serve_forever  s      <, ,	  C !% 2C((----------"   KLLL   -'',,,,,,,,,,,- -s   A $A76A7)r   )rz   rc   create_listenerr  r  )r   r  s   ` r&   startzUCXListener.start  sI      	- 	- 	- 	- 	-  	-m$BRSSSr(   c                    d | _         d S r   )r  r   s    r&   stopzUCXListener.stop  s    r(   c                    | j         | j        fS r   )r   r   r   s    r&   get_host_portzUCXListener.get_host_port  s    w	!!r(   c                H    | j         t          |                                  z   S r   )r   r   r  r   s    r&   listen_addresszUCXListener.listen_address  s!    {.0B0B0D0DEEEr(   c                ~    |                                  \  }}t          |          }| j        t          ||          z   S r   )r  r   r   r   )r   hostr   s      r&   contact_addresszUCXListener.contact_address   s<    ''))
d#D)){.tT::::r(   c                *    |                                  S r   )r  r   s    r&   bound_addresszUCXListener.bound_address&  s     !!###r(   )NFT)
r   r!   r  r  r   r   r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r  r  r  r  r  r  r   r(   r&   r  r    s        F(J&I
 AE!"/ / / / /$ $ $ X$ 9 9 X9T T T(  " " " F F XF ; ; X;
 $ $ X$ $ $r(   r  c                  2    e Zd Zd Zd Zd Zd Zd Zd ZdS )
UCXBackendc                    t                      S r   )r   r   s    r&   get_connectorzUCXBackend.get_connector0  s    ~~r(   c                     t          |||fi |S r   )r  )r   lochandle_commr   r   s        r&   get_listenerzUCXBackend.get_listener3  s    3[LLOLLLr(   c                ,    t          |          d         S r;   r   r   r#  s     r&   get_address_hostzUCXBackend.get_address_host9  s    s##A&&r(   c                     t          |          S r   r'  r(  s     r&   get_address_host_portz UCXBackend.get_address_host_port<  s    s###r(   c                `    t          |          \  }}t          t          |          |          S r   )r   r   r   )r   r#  r  r   s       r&   resolve_addresszUCXBackend.resolve_address?  s)    $S))
d 4$777r(   c                    t          |          \  }}t          |          }d|v rt          |          }nt          |          }t	          |d           S )Nr  )r   r   r   r   r   )r   r#  r  r   
local_hosts        r&   get_local_address_forz UCXBackend.get_local_address_forC  sO    $S))
d$;;!$JJJ T222r(   N)	r   r   r   r!  r%  r)  r+  r-  r0  r   r(   r&   r  r  -  sq          M M M' ' '$ $ $8 8 83 3 3 3 3r(   r  r   c                    i } t          t          j                            d          t          j                            d          t          j                            d          g          rt          j                            d          rd}d}nd}d}t          t          j                            d          t          j                            d          g          r|dz   }t          j                            d          rd	|z   }t          j                            d          r|d
z   }||d} i }t          j                            di                                           D ]\  }}d                    t          t          j        dg|	                    d          R                     }|dd         x}| v r1t                              d| d| d|d| d| |          d           |t          j        v r;t                              d| d| d|d| dt          j        |          d           |||<   | |fS )a  Translate dask config options to appropriate UCX config options

    Returns
    -------
    tuple
        Options suitable for passing to ``ucp.init`` and additional
        UCX options that will be inserted directly into the environment
        while calling ``ucp.init``.
    zdistributed.comm.ucx.tcpzdistributed.comm.ucx.nvlinkzdistributed.comm.ucx.infinibandzdistributed.comm.ucx.rdmacmtcprdmacmzdistributed.comm.ucx.cuda-copyz
,cuda_copyzrc,z	,cuda_ipc)rJ   SOCKADDR_TLS_PRIORITYz distributed.comm.ucx.environment_r   -   Nz	Ignoring =z (key=z!) in ucx.environment, preferring z from high level optionsz from external environment)r   rh   ri   rg   itemsjoinmapr!   upperrk   r/   r0   re   rf   info)high_level_optionstlstls_priorityenvironment_optionskvkeyhl_keys           r&   rd   rd   P  s   " 
 KOO677KOO9::KOO=>>	
  Q ;??899 	!C#LLC L
  =>> @AA
 
 	% $C;??<== 	#+C;??899 	$#C%(<PP  BBGGMMOO ) )1hhs39u&<qwws||&<&<==>>!""gF"444NN*A * * * *S * *$* *'9&'A* * *   
 BJ KKPA P P P PS P P!P P$&JsOP P P   
 ()$$222r(   )r   r   r    r!   )r   r   r)   r*   r    r+   )r4   r   r5   r   r)   r*   r    r+   )r   )Br   
__future__r   r   loggingre   r   r]   collections.abcr   r   r   typingr   r   unittest.mockr	   rh   
dask.utilsr
   distributed.comm.addressingr   r   distributed.comm.corer   r   r   r   distributed.comm.registryr   r   distributed.comm.utilsr   r   r   r   distributed.diagnostics.nvmlr   r   r   distributed.utilsr   r   r   r   r   	getLoggerr   r/   rc   rj   rV   rl   ro   r1   r'   r3   r9   rG   rz   r   r   r   r  r  rd   r   r(   r&   <module>rS     ss    # " " " " "      				   ; ; ; ; ; ; ; ; ; ; % % % % % % % %        " " " " " " J J J J J J J J L L L L L L L L L L L L 7 7 7 7 7 7 7 7                    
 N M M M M M M M M M M M M M		8	$	$  



    C!  : C C C C   	 	 	 	   g g gT	 	 	q$ q$ q$ q$ q$$ q$ q$ q$h
 
 
 
 
9 
 
 
.H$ H$ H$ H$ H$( H$ H$ H$V3 3 3 3 3 3 3 3@ *,,K3 K3 K3 K3 K3s   B BB