
    ԋg"                        d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZmZmZmZ d d
lmZ  ej0                  e      Z G d d      Z G d d      Zy)    )annotationsN)defaultdict)suppress)merge)parse_timedelta)Future)time)DeadlineTimeoutError
log_errorswait_for)
get_clientc                  H    e Zd ZdZd Zd	dZd Z	 d
dZddZe	dd       Z
y)VariableExtensionzAn extension for the scheduler to manage Variables

    This adds the following routes to the scheduler

    *  variable-set
    *  variable-get
    *  variable-delete
    c                   || _         t               | _        t        t              | _        t        t        j                        | _        t        j                         | _	        | j                   j                  j                  | j                  | j                  d       | j                  | j                   j                  d<   | j                  | j                   j                  d<   y )N)variable_setvariable_get variable-future-received-confirmvariable_delete)	schedulerdict	variablesr   setwaitingasyncio	Conditionwaiting_conditionsstartedhandlersupdategetfuture_received_confirmstream_handlersdelete)selfr   s     4lib/python3.12/site-packages/distributed/variable.py__init__zVariableExtension.__init__   s    ""3'"-g.?.?"@((*&&!XXtxx@	

 (( 	&&'IJ =AKK&&'89    Nc                  K   t        j                  t        |            }|d|d}|| j                  j                  vrQt        j                  d       d {    |j                  rt        d| d      || j                  j                  vrQ| j                  j                  |gd|z         nd|d}	 | j                  |   }|d	   dk(  r1|d
   |k7  r)t        j                  | j                  |d
   |             	 || j                  vr?| j                  4 d {    | j                  j                          d d d       d {    || j                  |<   y 7 # t        $ r Y kw xY w7 P7 (# 1 d {  7  sw Y   8xY ww)Nr   )typevalueg{Gz?zTask z unknown to scheduler.variable-%skeysclientmsgpackr*   r+   )r
   afterr   r   tasksr   sleepexpiredr   client_desires_keysr   ensure_futurereleaseKeyErrorr   
notify_all)	r%   namekeydatar/   timeoutdeadlinerecordolds	            r&   r   zVariableExtension.set/   sU    >>/'":;?&5FT^^111mmD)))##&se3I'JKK T^^111 NN..SE-RVBV.W'$7F	H..&C 6{h&3w<3+>%%dll3w<&FGt~~%|||'') $|%t! *  		 $||||s   AF	E7F	'F	9E! AF	E0 F	#E4>F		E2
F	!	E-*F	,E--F	2F	4F:E=;FF	c                  K   | j                   ||f   r_| j                  |   4 d {    | j                  |   j                          d {    d d d       d {    | j                   ||f   r_| j                  j	                  |gd|z         | j                   ||f= y 7 |7 Y7 K# 1 d {  7  sw Y   [xY ww)Nr,   r-   )r   r   waitr   client_releases_keys)r%   r;   r:   s      r&   r7   zVariableExtension.releaseF   s     ll39%..t44--d388::: 54 ll39% 	++#}t?S+TLLd#	 5: 5444s\   %B?B$B?!B*B&B*B?B(B?32B?&B*(B?*B<0B31B<8B?c                f  K   | j                   j                  |g|       | j                  ||f   j                  |       | j                  ||f   sF| j                  |   4 d {    | j                  |   j                          d d d       d {    y y 7 37 # 1 d {  7  sw Y   y xY wwN)r   r5   r   remover   r9   )r%   r:   r;   tokenr/   s        r&   r"   z)VariableExtension.future_received_confirmN   s      	**C5&9S$Y&&u-||CI&..t44''-88: 544 '44444sH   A"B1$B%B1(BB1BB1B1B."B%#B.*B1c                   K   t               }| j                  vro||t               |z
  z
  }nd }|r|dk  r
t               	  fd}t         |       |       d {     j                  j                          | j                  vro j                  |   }|d   dk(  r|d   }t        j                         j                  }	 j                  j                  j                  |      }
|
|
j                  nd}|	|d}|d	k(  r2|
j                  j                  |d
<   |
j                  j                  |d<   t!        ||      } j"                  ||f   j%                  |	       |S 7 #  j                  j                          w xY ww)Nr   c                    K    j                   j                          d {     j                   j                          d {    y 7 '7 wrE   )r   acquirerB   r%   s   r&   _z VariableExtension.get.<locals>._b   s<     ,,..000,,++--- 1-s!   AA	!AAAA)r=   r*   r   r+   lost)rG   stateerred	exception	traceback)r	   r   r   r   r   r7   uuiduuid4hexr   r2   r!   rN   exception_blamerP   rQ   r   r   add)r%   r:   r/   r=   startleftrL   r?   r;   rG   tsrN   msgs   `            r&   r!   zVariableExtension.getW   sa    $..("$&5.1q"n$'. qsD111$$& $..(" %&>X%/CJJL$$E%%))#.B "BHHFE!E2C#%#5#5#?#?K #%#5#5#?#?K 63'FLLd#''.! 2$$&s6   ?FE' E$E' "(FCF$E' 'FFc                  K   	 | j                   |   }|d   dk(  r| j                  |d   |       d {    	 t        t              5  | j                  |= d d d        t        t              5  | j                   |= d d d        | j
                  j                  d|z         y 7 n# t        $ r Y vw xY w# 1 sw Y   ^xY w# 1 sw Y   ExY ww)Nr*   r   r+   r,   )r   r7   r8   r   r   r   remove_client)r%   r:   r/   r@   s       r&   r$   zVariableExtension.deletex   s     	7..&C 6{h&ll3w<666h''-  ht$   	$$]T%9: 7	  		
  sa   CB$  CB"CB3C-B?;(C$	B0-C/B00C3B<8C?CC)NNNNN)NNNN)NNNNN)__name__
__module____qualname____doc__r'   r   r7   r"   r!   r   r$    r(   r&   r   r      s;    H &.$ 7;;B ; ;r(   r   c                  X    e Zd ZdZddZed        Zd Zd ZddZ	ddZ
dd	Zd
 Zd Zy)Variablea-  Distributed Global Variable

    This allows multiple clients to share futures and data between each other
    with a single mutable variable.  All metadata is sequentialized through the
    scheduler.  Race conditions can occur.

    Values must be either Futures or msgpack-encodable data (ints, lists,
    strings, etc..)  All data will be kept and sent through the scheduler, so
    it is wise not to send too much.  If you want to share a large amount of
    data then ``scatter`` it and share the future instead.

    Parameters
    ----------
    name: string (optional)
        Name used by other clients and the scheduler to identify the variable.
        If not given, a random name will be generated.
    client: Client (optional)
        Client used for communication with the scheduler.
        If not given, the default global client will be used.

    Examples
    --------
    >>> from dask.distributed import Client, Variable # doctest: +SKIP
    >>> client = Client()  # doctest: +SKIP
    >>> x = Variable('x')  # doctest: +SKIP
    >>> x.set(123)  # docttest: +SKIP
    >>> x.get()  # docttest: +SKIP
    123
    >>> future = client.submit(f, x)  # doctest: +SKIP
    >>> x.set(future)  # doctest: +SKIP

    See Also
    --------
    Queue: shared multi-producer/multi-consumer queue between clients
    Nc                f    || _         |xs! dt        j                         j                  z   | _        y )Nz	variable-)_clientrR   rS   rT   r:   )r%   r:   r/   s      r&   r'   zVariable.__init__   s&    :K$**,*:*::	r(   c                    | j                   s	 t               | _         | j                   S | j                   S # t        $ r Y | j                   S w xY wrE   )rf   r   
ValueErrorrK   s    r&   r/   zVariable.client   sJ    ||)| ||t||  ||s   5 	AAc                J    | j                   st        t        |        d      y )Nz object not properly initialized. This can happen if the object is being deserialized outside of the context of a Client or Worker.)r/   RuntimeErrorr*   rK   s    r&   _verify_runningzVariable._verify_running   s.    {{:, ' '  r(   c                2  K   t        |t              rE| j                  j                  j	                  |j
                  | j                  |       d {    y | j                  j                  j	                  || j                  |       d {    y 7 @7 w)N)r;   r:   r=   )r<   r:   r=   )
isinstancer   r/   r   r   r;   r:   )r%   r+   r=   s      r&   _setzVariable._set   s     eV$++''44IIDIIw 5    ++''44G 5   	s$   ABB:BBBBc                v    | j                           | j                  j                  | j                  |fd|i|S )zSet the value of this variable

        Parameters
        ----------
        value : Future or object
            Must be either a Future or a msgpack-encodable value
        r=   )rk   r/   syncrn   )r%   r+   r=   kwargss       r&   r   zVariable.set   s7     	t{{		5L'LVLLr(   c                  K   | j                   j                  j                  || j                  | j                   j                         d {   }|d   dk(  rzt        |d   | j                   |d         }|d   dk(  r"|j                  j                  |d   |d	          | j                   j                  d
| j                  |d   |d   d       |S |d   }|S 7 w)N)r=   r:   r/   r*   r   r+   rN   )rN   rO   rP   rQ   r   rG   )opr:   r;   rG   )	r/   r   r   r:   idr   _state	set_error_send_to_scheduler)r%   r=   dr+   s       r&   _getzVariable._get   s     ++''44$))DKKNN 5 
 
 V9 1W:t{{!G*EEzW$&&q~q~FKK**< IIW:wZ	  gJE#
s   A
CCBCc                    | j                          t        |      } | j                  j                  | j                  fd|i|S )a@  Get the value of this variable

        Parameters
        ----------
        timeout : number or string or timedelta, optional
            Time in seconds to wait before timing out.
            Instead of number of seconds, it is also possible to specify
            a timedelta in string format, e.g. "200ms".
        r=   )rk   r   r/   rp   ry   )r%   r=   rq   s      r&   r!   zVariable.get   s?     	!'*t{{		E7EfEEr(   c                    | j                          | j                  j                  dk(  r)| j                  j                  d| j                  d       yy)zmDelete this variable

        Caution, this affects all clients currently pointing to this variable.
        runningr   )rs   r:   N)rk   r/   statusrw   r:   rK   s    r&   r$   zVariable.delete   sC    
 	;;*KK**2CTYY+WX +r(   c                (    t         | j                  ffS rE   )rd   r:   rK   s    r&   
__reduce__zVariable.__reduce__  s    $))%%r(   r]   )z30 srE   )r^   r_   r`   ra   r'   propertyr/   rk   rn   r   ry   r!   r$   r   rb   r(   r&   rd   rd      sG    "H;  	M(FY&r(   rd   )
__future__r   r   loggingrR   collectionsr   
contextlibr   tlzr   
dask.utilsr   distributed.clientr   distributed.metricsr	   distributed.utilsr
   r   r   r   distributed.workerr   	getLoggerr^   loggerr   rd   rb   r(   r&   <module>r      sW    "    #   & % $ J J )			8	$q; q;h{& {&r(   