
    ԋgD                        d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	 d dl
mZmZmZ d dlmZ erd dlmZ  ej$                  e      Z G d	 d
      Z G d d      Zy)    )annotationsN)defaultdictdeque)
Collection)partial)TYPE_CHECKINGAnyoverload)time)	Schedulerc                  T    e Zd ZU ded<   ded<   ded<   ddZddZdd	Zdd
ZddZy)Topicr   eventsintcountsetsubscribersc                R    t        |      | _        d| _        t               | _        y )Nmaxlenr   )r   r   r   r   r   )selfr   s     2lib/python3.12/site-packages/distributed/broker.py__init__zTopic.__init__   s    6*
5    c                :    | j                   j                  |       y N)r   addr   
subscribers     r   	subscribezTopic.subscribe   s    Z(r   c                :    | j                   j                  |       y r   )r   discardr   s     r   unsubscribezTopic.unsubscribe   s      ,r   c                d    | j                   j                  |       | xj                  dz  c_        y )N   )r   appendr   )r   events     r   publishzTopic.publish!   s!    5!

a
r   c                8    | j                   j                          y r   )r   clear)r   s    r   truncatezTopic.truncate%   s    r   N)r   r   )r   strreturnNone)r'   r	   r-   r.   )r-   r.   )	__name__
__module____qualname____annotations__r   r    r#   r(   r+    r   r   r   r      s+    MJ!
)-r   r   c                      e Zd ZU ded<   ded<   ddZddZddZddZddd
ZddZ	e
dd       Ze
	 d	 	 	 dd       Z	 d	 	 	 ddZy	)Brokerr   
_schedulerzdefaultdict[str, Topic]_topicsc                P    || _         t        t        t        |            | _        y )Nr   )r6   r   r   r   r7   )r   r   	schedulers      r   r   zBroker.__init__-   s    #"75#@Ar   c                @    | j                   |   j                  |       y r   )r7   r    r   topicr   s      r   r    zBroker.subscribe1   s    U%%j1r   c                @    | j                   |   j                  |       y r   )r7   r#   r;   s      r   r#   zBroker.unsubscribe4   s    U''
3r   c                   t               |f}t        |t              r|g}|D ]y  }| j                  |   }|j	                  |       | j                  ||       t        | j                  j                  j                               D ]  }	 |j                  ||        { y # t        $ r t        j                  dd       Y =w xY w)NzPlugin failed with exceptionT)exc_info)r   
isinstancer,   r7   r(   _send_to_subscriberslistr6   pluginsvalues	log_event	Exceptionloggerinfo)r   topicsmsgr'   namer<   plugins          r   r(   zBroker.publish7   s    fc"XFDLL&EMM% %%dE2t66==?@O$$T3/ A  ! OKK >KNOs   B CCNc                    |0| j                   j                         D ]  }|j                           y || j                   v r| j                   |   j                          y y r   )r7   rD   r+   )r   r<   _topics      r   r+   zBroker.truncateF   sN    =,,--/! 0dll"LL((* #r   c                    d||d}| j                   |   j                  D ci c]  }||g }}| j                  j                  |i        y c c}w )Nr'   )opr<   r'   )worker_msgs)r7   r   r6   send_all)r   r<   r'   rJ   clientclient_msgss         r   rA   zBroker._send_to_subscribersM   s\    

 48<<3F3R3RS3Rvu}3RS  " = Ts   Ac                     y r   r3   r   r<   s     r   
get_eventszBroker.get_eventsV   s    GJr   c                     y r   r3   rV   s     r   rW   zBroker.get_eventsY   s     47r   c                    |"t        | j                  |   j                        S | j                  j                         D ci c]'  \  }}|j                  r|t        |j                        ) c}}S c c}}w r   )tupler7   r   items)r   r<   rK   s      r   rW   zBroker.get_events^   sq     e,3344 $(<<#5#5#7#7KD%<< eELL))#7  s   ,A2)r   r   r9   r   r-   r.   )r<   r,   r   r,   r-   r.   )rI   zstr | Collection[str]rJ   r	   r-   r.   r   )r<   
str | Noner-   r.   )r<   r,   r'   r	   r-   r.   )r<   r,   r-   ztuple[tuple[float, Any], ...])r<   r.   r-   z(dict[str, tuple[tuple[float, Any], ...]])r<   r\   r-   zHtuple[tuple[float, Any], ...] | dict[str, tuple[tuple[float, Any], ...]])r/   r0   r1   r2   r   r    r#   r(   r+   rA   r
   rW   r3   r   r   r5   r5   )   s}    $$B24O+> J J 77	17 7
 #'

	Q
r   r5   )
__future__r   loggingcollectionsr   r   collections.abcr   	functoolsr   typingr   r	   r
   distributed.metricsr   distributedr   	getLoggerr/   rG   r   r5   r3   r   r   <module>rf      sI    "  * &  / / $%			8	$ 0? ?r   