
    ԋg                    r    d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
  G d d      Z G d	 d
e      Zy)    )annotationsN)defaultdict)MutableMapping)	stringify)
log_errorsc                  h    e Zd ZdZd Zd Zd Zed
d       Zedd       Z	ed        Z
edd	       Zy)PublishExtensionzAn extension for the scheduler to manage collections

    *  publish_list
    *  publish_put
    *  publish_get
    *  publish_delete
    c                   || _         t               | _        | j                  | j                  | j
                  | j                  | j                  d}d| j                  i}| j                   j                  j                  |       | j                   j                  j                  |       t        t        j                        | _        y )N)publish_listpublish_putpublish_getpublish_deletepublish_wait_flushpublish_flush_batched_send)	schedulerdictdatasetslistputgetdelete
flush_waitflush_receivehandlersupdatestream_handlersr   asyncioEvent_flush_received)selfr   r   r   s       3lib/python3.12/site-packages/distributed/publish.py__init__zPublishExtension.__init__   s    " !II8888"kk"&//
 )$*<*<
 	&&x0&&--o>*7==9    c                >    | j                   |   j                          y N)r   set)r    uidkwargss      r!   r   zPublishExtension.flush_receive(   s    S!%%'r#   c                Z   K   | j                   |   j                          d {    y 7 wr%   )r   wait)r    r'   s     r!   r   zPublishExtension.flush_wait+   s#     ""3',,...s   !+)+Nc                    |s|| j                   v rt        d|z        | j                  j                  |dt	        |              ||d| j                   |<   d|dS )NzDataset %s already exists
published-)datakeysOK)statusname)r   KeyErrorr   client_desires_keysr   )r    r.   r-   r1   overrideclients         r!   r   zPublishExtension.put.   s_    DDMM16=>>**4:io=N1OP'+T:d--r#   c                    | j                   j                  |dg i      }| j                  j                  |d   dt	        |              y )Nr.   r,   )r   popr   client_releases_keysr   )r    r1   outs      r!   r   zPublishExtension.delete6   sA    mmvrl3++CK:iPToEV9WXr#   c                f    t        t        | j                  j                         t                    S )N)key)r   sortedr   r.   str)r    argss     r!   r   zPublishExtension.list;   s!    F4==--/S9::r#   c                :    | j                   j                  |d       S r%   )r   r   )r    r1   r5   s      r!   r   zPublishExtension.get?   s    }}  t,,r#   )NNNFNr%   )NN)__name__
__module____qualname____doc__r"   r   r   r   r   r   r   r    r#   r!   r	   r	      sh    :&(/ . . Y Y ; ; - -r#   r	   c                  >    e Zd ZdZdZd Zd Zd Zd Zd Z	d Z
d	 Zy
)DatasetszA dict-like wrapper around :class:`Client` dataset methods.

    Parameters
    ----------
    client : distributed.client.Client

    _clientc                    || _         y r%   rG   )r    r5   s     r!   r"   zDatasets.__init__O   s	    r#   c                8    | j                   j                  |      S r%   )rH   get_datasetr    r;   s     r!   __getitem__zDatasets.__getitem__R   s    ||'',,r#   c                    | j                   j                  rt        d      | j                   j                  ||       y )Nz~Can't use 'client.datasets[name] = value' when client is asynchronous; please use 'client.publish_dataset(name=value)' instead)r1   )rH   asynchronous	TypeErrorpublish_dataset)r    r;   values      r!   __setitem__zDatasets.__setitem__V   s;    <<$$X  	$$U$5r#   c                z    | j                   j                  rt        d      | j                   j                  |      S )NzvCan't use 'del client.datasets[name]' when client is asynchronous; please use 'client.unpublish_dataset(name)' instead)rH   rO   rP   unpublish_datasetrL   s     r!   __delitem__zDatasets.__delitem___   s9    <<$$F  ||--c22r#   c              #     K   | j                   j                  rt        d      | j                   j                         E d {    y 7 w)NzdCan't invoke iter() or 'for' on client.datasets when client is asynchronous; use 'async for' instead)rH   rO   rP   list_datasetsr    s    r!   __iter__zDatasets.__iter__h   s=     <<$$8  <<--///s   ?A	AA	c                ^      j                   j                  st        d       fd} |       S )NzcCan't invoke 'async for' on client.datasets when client is synchronous; use iter() or 'for' insteadc                j   K   j                   j                          d {   D ]  } |  	 y 7 wr%   )rH   rX   )r;   r    s    r!   _zDatasets.__aiter__.<locals>._w   s)     !\\77999	 :s   313)rH   rO   rP   )r    r]   s   ` r!   	__aiter__zDatasets.__aiter__p   s0    ||((; 
	 s
r#   c                    | j                   j                  rt        d      t        | j                   j	                               S )NztCan't use 'len(client.datasets)' when client is asynchronous; please use 'len(await client.list_datasets())' instead)rH   rO   rP   lenrX   rY   s    r!   __len__zDatasets.__len__}   s<    <<$$I  4<<--/00r#   N)r@   rA   rB   rC   	__slots__r"   rM   rS   rV   rZ   r^   ra   rD   r#   r!   rF   rF   D   s1     I-6301r#   rF   )
__future__r   r   collectionsr   collections.abcr   
dask.utilsr   distributed.utilsr   r	   rF   rD   r#   r!   <module>rh      s0    "  # *   (5- 5-p@1~ @1r#   