
    ԋgM                    L   d dl mZ d dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d dlZd d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl m!Z!m"Z" erd dl#m$Z$m%Z%m&Z&m'Z'm(Z( dZ) ejT                  e+      Z,ejZ                  j]                  d      Z/h dZ0h dZ1ddhZ2 G d de      Z3 G d de      Z4	 	 	 	 	 	 	 	 ddZ5ejZ                  j]                  d      jm                         D  ch c]  \  } } e|      dk  r|  c}} Z7yc c}} w )    )annotationsN)defaultdict)	Container)partial)log2)time)TYPE_CHECKINGAnyClassVar	TypedDictcast)topk)Key)parse_timedelta)PeriodicCallback)CommClosedError)SchedulerPlugin)
log_errorsrecursive_to_dict)	SchedulerSchedulerState	TaskStateTaskStateStateWorkerStateg?zdistributed.admin.pdb-on-err>   readywaitingconstrained>   long-runningmemoryresumed	cancelled	executingreleasedc                  @    e Zd ZU ded<   ded<   ded<   ded<   ded<   y	)
InFlightInfor   victimthieffloatvictim_durationthief_durationstrstimulus_idN)__name__
__module____qualname____annotations__     4lib/python3.12/site-packages/distributed/stealing.pyr%   r%   ?   s     r2   r%   c                     e Zd ZU ded<   ded<   ded<   d ed  ed	d
      D              z   Zded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   d6dZd7d8dZd9dZ	d d!d:d"Z
d;d#Zd<d=d$Zd>d%Zd9d&Z	 	 	 	 	 	 	 	 	 	 	 	 d?d'Zd@d(ZdAd)ZdBd*ZdBd+ZdBd,ZdCd-Z	 	 	 	 	 	 	 	 dDd.Zdd/	 	 	 	 	 	 	 	 	 dEd0Zed9d1       ZdFd2ZdGd3Zd8d4ZdHd5Zy)IWorkStealingr   	schedulerz%dict[str, tuple[set[TaskState], ...]]	stealablez dict[TaskState, tuple[str, int]]key_stealable)g      ?c              #  2   K   | ]  }d d|dz
  z  z     yw)         Nr1   ).0is     r3   	<genexpr>zWorkStealing.<genexpr>N   s"      C".QA!a%L,s   r:      zClassVar[tuple[float, ...]]cost_multipliersr(   _callback_timeintcountzdict[TaskState, InFlightInfo]	in_flightzdefaultdict[WorkerState, float]in_flight_occupancyzdefaultdict[WorkerState, int]in_flight_taskszdict[str, dict[int, float]]metricszasyncio.Event_in_flight_event_request_counterc                ^   || _         i | _        i | _        |j                  D ]  }| j	                  |        t        t        t        t        j                  j                  d      d            | _        | j                   j                  |        d| _        i | _        t        t               | _        t        t               | _        t'        j(                         | _        t        t               t        t               d| _        d| _        | j0                  | j                   j2                  d<   y )Nworkerz,distributed.scheduler.work-stealing-intervalms)defaultr   )request_count_totalrequest_cost_totalzsteal-response)r6   r7   r8   workers
add_workerr   r(   r   daskconfiggetrB   
add_pluginrD   rE   r   rC   rF   rG   asyncioEventrI   rH   rJ   move_task_confirmstream_handlersselfr6   rM   s      r3   __init__zWorkStealing.__init__\   s    "''FOO6O* ( # NO
 	!!$'
#.s#3 *3/ '#.s#3"-c"2
 !";?;Q;Q&&'78r2   Nc                  K   d| j                   j                  v ryt        | j                  | j                  dz        }|j                          || j                   j                  d<   | j                  j                          yw)aF  Start the background coroutine to balance the tasks on the cluster.
        Idempotent.
        The scheduler argument is ignored. It is merely required to satisfy the
        plugin interface. Since this class is simultaneously an extension, the
        scheduler instance is already registered during initialization
        stealingNi  )callbackcallback_time)r6   periodic_callbacksr   balancerB   startrI   set)r]   r6   pcs      r3   re   zWorkStealing.starty   sl      :::\\1D1Dt1K
 	
8:))*5!!#s   BBc                   K   | j                   j                  j                  dd      }|r|j                          | j                  j                          d{    y7 w)zStop the background task balancing tasks on the cluster.
        This will block until all currently running stealing requests are
        finished. Idempotent
        r`   N)r6   rc   popstoprI   wait)r]   rg   s     r3   rj   zWorkStealing.stop   sH     
 ^^..22:tDGGI##((***s   AA AA r1   )excludec                   t        | |d      S )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        T)rl   members)r   )r]   rl   s     r3   _to_dict_no_nestzWorkStealing._to_dict_no_nest   s     !wEEr2   c                :    | j                   j                  d|      S Nr`   )r6   	log_event)r]   msgs     r3   logzWorkStealing.log   s    ~~''
C88r2   c                T    t        d t        d      D              | j                  |<   y )Nc              3  0   K   | ]  }t                 y wN)rf   )r=   _s     r3   r?   z*WorkStealing.add_worker.<locals>.<genexpr>   s     &@isuis   r@   )tupleranger7   r\   s      r3   rS   zWorkStealing.add_worker   s    !&&@eBi&@!@vr2   c                    | j                   |= y rw   )r7   )r]   r6   rM   kwargss       r3   remove_workerzWorkStealing.remove_worker   s    NN6"r2   c                f    | j                   j                  }d|v r|d   j                          |d= y y rq   )r6   rc   rj   )r]   pcss     r3   teardownzWorkStealing.teardown   s5    nn//
O  "J r2   c                    |dk(  r;| j                   j                  |   }| j                  |       | j                  |       |dk(  r+| j                   j                  |   }| j	                  |       y y )N
processing)r6   tasksremove_key_from_stealable_remove_from_in_flightput_key_in_stealable)r]   keyre   finishargsr|   tss          r3   
transitionzWorkStealing.transition   sk     L %%c*B**2.''+\!%%c*B%%b) "r2   c                .   || j                   |<   | j                  j                          |d   }|d   }| j                  |xx   |d   z  cc<   | j                  |xx   |d   z  cc<   | j                  |xx   dz  cc<   | j                  |xx   dz  cc<   y )Nr'   r&   r)   r*   r:   )rE   rI   clearrF   rG   r]   r   infor'   r&   s        r3   _add_to_in_flightzWorkStealing._add_to_in_flight   s    !r##%Wh  (D1B,CC(  '40@+AA'V$)$U#q(#r2   c                   | j                   j                  |d       }|r|d   }|d   }| j                  |xx   |d   z  cc<   | j                  |xx   |d   z  cc<   | j                  |xx   dz  cc<   | j                  |xx   dz  cc<   | j                   s4| j                  j	                          | j
                  j                          |S )Nr'   r&   r*   r)   r:   )rE   ri   rF   rG   r   rI   rf   r   s        r3   r   z#WorkStealing._remove_from_in_flight   s    ~~!!"d+ME(^F$$U+t4D/EE+$$V,5F0GG,  (A-(  '1,'>>((..0%%))+r2   c                f    || j                   vr#| j                  |       | j                  |       y y rw   )rE   r   r   )r]   r   s     r3   recalculate_costzWorkStealing.recalculate_cost   s/    T^^#**2.%%b) $r2   c                    | j                  |      \  }}|]|J |j                  sJ |j                  }|j                  }| j                  |   |   j	                  |       ||f| j
                  |<   y y rw   )steal_time_ratioprocessing_onaddressr7   addr8   )r]   r   cost_multiplierlevelwsrM   s         r3   r   z!WorkStealing.put_key_in_stealable   s    !%!6!6r!:&$$$####!!BZZFNN6"5)--b1&,e_Dr" 'r2   c                    | j                   j                  |d       }|y |\  }}| j                  |   |   j                  |       y rw   )r8   ri   r7   discard)r]   r   resultrM   r   s        r3   r   z&WorkStealing.remove_key_from_stealable   sG    ##''D1>vu%--b1r2   c                   |j                   j                  }|t        v ry|j                  sy| j                  j                  |      }|s)|j                  sJ ||j                  j                  v sJ y|j                         }|| j                  j                  z  t        z   }||z  }t        t        t        |      dz               }|dk  rd}||fS |t        | j                        k\  ry||fS )a;  The compute to communication time ratio of a key

        Returns
        -------
        cost_multiplier: The increased cost from moving this task as a factor.
        For example a result of zero implies a task without dependencies.
        level: The location within a stealable list to place this value
        NN)r   r   r<   r:   )prefixname
fast_tasksdependenciesr6   get_task_durationr   long_runningget_nbytes_deps	bandwidthLATENCYrC   roundr   lenrA   )r]   r   splitcompute_timenbytestransfer_timer   r   s           r3   r   zWorkStealing.steal_time_ratio   s     		J~~77; ####))66666##%!9!99GC',6E$/!34519E %% c$//00%%r2   c                H   	 || j                   v ryd| j                   }| xj                  dz  c_        |j                  }| j                  |       t        j                  d|||j                  ||j                         | j                  j                  |      | j                  j                  ||      z   }| j                  j                  |      | j                  j                  ||      z   }| j                  j                  |j                     j                  d||d       |||||d}| j                  ||       |S # t        $ r t        j                  d||       Y y	t         $ r5}	t        j#                  |	       t$        rd
d l}
|
j)                           d }	~	ww xY w)Nz	in-flightzsteal-r:   z#Request move %s, %s: %2f -> %s: %2fzsteal-request)opr   r,   )r&   r'   r)   r*   r,   z(Worker comm %r closed while stealing: %rzcomm-closedr   )rE   rJ   r   r   loggerdebug	occupancyr6   r   get_comm_coststream_commsr   sendr   r   r   	Exception	exceptionLOG_PDBpdb	set_trace)r]   r   r&   r'   r,   r   r)   r*   r   er   s              r3   move_task_requestzWorkStealing.move_task_request  s   1	T^^#" #4#8#8"9:K!!Q&!&&C**2.LL5   #nn>>,,R89O "^^==,,R78N NN''7<<&s;O !#2"0*"D ""2t, 	!KKBFBO  	Q	s#   E D0E  F!$F!,0FF!rL   c                  	 | j                   j                  |   }	 | j
                  |   d   |k7  r| j                  d||||f       y 	 | j                  |      }|sJ |d   }|d   }t        j	                  d||||       |j                  |k(  sJ 	 |||j                  |j                  |g}	|t        v s:|t        v r|| j                   j                  j                  |j                        k7  rT| j                  d|j                  | j                   j                  vg|	       | j                   j                  ||	       n|t        v r| j                  d
g|	       n|t        v r| j!                  |       ||_        |j#                  |       |j%                  |       | j'                  |       | j                   j)                  |j                  |       | j                  dg|	       nt+        d|       | j                   j7                  |       | j                   j7                  |       y # t        $ r t        j	                  d|       Y y w xY w# t        $ r | j                  d||||f       Y y w xY w# t,        $ r5}
t        j/                  |
       t0        rdd l}|j5                           d }
~
ww xY w# | j                   j7                  |       | j                   j7                  |       w xY w)Nz,Key released between request and confirm: %sr,   zstale-responsezalready-abortedr'   r&   z%Confirm move %s, %s -> %s.  State: %s
reschedule)r,   zalready-computingconfirmzUnexpected task state: r   )r6   r   KeyErrorr   r   rE   rt   r   r   r   _WORKER_STATE_UNDEFINED_WORKER_STATE_CONFIRMrR   rV   _reschedule_WORKER_STATE_REJECTr   remove_from_processingadd_to_processingr   send_task_to_worker
ValueErrorr   r   r   r   r   check_idle_saturated)r]   r   stater,   rM   r   r   r'   r&   _log_msgr   r   s               r3   rZ   zWorkStealing.move_task_confirmN  s   	%%c*B	~~b!-0K?*CLM @ **2.tWh<c65RWX6))))	8UFNNEMM;OH 0011T^^3377FF$T^^-C-CC " **3K*H..-99://..r2#( --b1''+))"-225=="E)/h/0 #:5'!BCC NN//6NN//7y  	LLGM	  	HH'eV[IJ	X  	Q	 NN//6NN//7sG   H6 +I E,J  6III=<I= 	J>	0J99J>>K 8K9c                   | j                   }g }t               }d}t        |j                  j	                               }|r!t        |      t        |j                        k(  ry |j                  }|sxt        d|j                  j	                         | j                        }|D cg c]:  }| j                  |      dkD  r$| j                  |      |j                  kD  r||vr|< }}|sy t        |      dk  rt        || j                  d      }|sJ |sJ t        | j                        D ]@  \  }}	|s n8t        |      D ]&  }
| j                   |
j"                     |   }|r|s%t        |      D ]  }|s n|| j$                  vs|j&                  |
us||
j(                  vr|j+                  |       E|dz  }t-        |||      x}sZ| j                  |      }| j                  |
      }| j                   j/                  ||      }| j                   j/                  ||
      }| j                   j1                  |      }||z   |z   |||z   d	z  z
  k  s| j3                  ||
|       ||z   }|j5                  |||j6                  ||
j"                  ||j"                  |f       | j8                  d
   |xx   dz  cc<   | j8                  d   |xx   |z  cc<   | j                  |      }| j                  |      }| j                   j;                  |||      s|j+                  |       |j+                  |        | j                   j=                  |
| j                  |
             ) C |r(| j?                  d|f       | xj@                  dz  c_         t               }|jB                  r"|jB                  d   jE                  ||z
         y y c c}w )Nr   
   r   g?   T)r   reverser:   r;   rP   rQ   )occrequestzsteal-duration)#r6   r   rf   idlevaluesr   rR   	saturatedr   _combined_occupancy_combined_nprocessingnthreadssorted	enumeraterA   listr7   r   r8   r   r   r   
_get_thiefr   r   r   appendr   rH   is_unoccupiedr   rt   rD   digestsr   )r]   srt   re   r>   potential_thievespotential_victimsr   r   rx   r&   r7   r   r'   	occ_thief
occ_victimcomm_cost_thiefcomm_cost_victimcomputecostnproc_thiefrj   s                         r3   rd   zWorkStealing.balance  s   NN0 C(9$:c!))n$LBC++  $AII$$&D,D,D!
 ,!+B++B/#5..r2R[[@//	 +  ! % !B& &!t'?'?! !     !$"7"78HE1$01 NN6>>:5A	 (9y/B,$"4"44++69V%6%66 "))"- FA%/27H%IIEI  $ 8 8 ?I!%!9!9&!AJ&*nn&B&B2u&MO'+~~'C'CB'O$"nn>>rBG "O3g=%)9G)Cq(HHI ..r65A&)99

 % % " $ & * % )	 %:;EBaGB%9:5ATIA$($<$<U$C	&*&@&@&G#~~;;!9k  .55e< "))"-i *j 33 8 8 @ 4 u 2 9B HHi%&JJ!OJv99II&'++D5L9 i!s   $?Oc                :    |j                   | j                  |   z   S rw   )r   rF   r]   r   s     r3   r   z WorkStealing._combined_occupancy  s    ||d66r:::r2   c                L    t        |j                        | j                  |   z   S rw   )r   r   rG   r   s     r3   r   z"WorkStealing._combined_nprocessing  s!    2==!D$8$8$<<<r2   c                    | j                   j                         D ]  }|D ]  }|j                            | j                  j                          y rw   )r7   r   r   r8   )r]   r6   r7   r   s       r3   restartzWorkStealing.restart  s@    ..0I	  1 	  "r2   c                2   |D ch c]   }t        |t              s|j                  n|" c}g }| j                  j	                  d      D ]C  \  }}|d   dk(  r|d   }n|g}|D ](  }t        fd|D              s|j                  |       * E |S c c}w )Nr`   )topicr   r   r:   c              3  &   K   | ]  }|v  
 y wrw   r1   )r=   xkeyss     r3   r?   z%WorkStealing.story.<locals>.<genexpr>  s     ,!QqDy!s   )
isinstancer+   r   r6   
get_eventsanyr   )r]   
keys_or_tsr   outrx   Ltr   s          @r3   storyzWorkStealing.story  s    HRS
z#s3<
SNN--J-?DAqty aDC,!,,JJqM  @ 
 Ts   %B)r6   r   rw   )r6   r
   returnNone)r   r   )rl   zContainer[str]r   dict)rs   r
   r   r   r   )r6   r
   rM   r
   r   r   )r6   r   rM   r+   r|   r
   r   r   )r   r   re   r   r   r   r   r
   r|   r
   r   r   )r   r   r   r%   r   r   )r   r   r   zInFlightInfo | None)r   r   r   r   )r   r   r   z%tuple[float, int] | tuple[None, None])r   r   r&   r   r'   r   r   r+   )
r   r+   r   r+   r,   r+   rM   z
str | Noner   r   )r   r   r   r(   )r   r   r   rC   )r   zstr | TaskStater   r   )r-   r.   r/   r0   ry   rz   rA   r^   re   rj   ro   rt   rS   r}   r   r   r   r   r   r   r   r   r   rZ   r   rd   r   r   r   r   r1   r2   r3   r5   r5   G   s   44334:U C"'2,C > 51  J,,8822((##R:$ + =? 	F9A# ** * 	*
 * * 
*,)*
52$&L44%049D4	4n OSA8A8"%A847A8AKA8	A8F e: e:N;=#r2   r5   c                    | j                  |      }|||z  }|r|}n|j                  sy t        |t        | j                  |            S )Nr   )valid_workersloose_restrictionsminr   worker_objective)r6   r   r   r  valid_thievess        r3   r   r     sT     ++B/M )M9 -&& gi.H.H"&MNNr2   z,distributed.scheduler.default-task-durationsgMbP?)r6   r   r   r   r   zset[WorkerState]r   zWorkerState | None)8
__future__r   rX   loggingcollectionsr   collections.abcr   	functoolsr   mathr   r   typingr	   r
   r   r   r   tlzr   rT   dask.typingr   
dask.utilsr   distributed.compatibilityr   distributed.corer   distributed.diagnostics.pluginr   distributed.utilsr   r   distributed.schedulerr   r   r   r   r   r   	getLoggerr-   r   rU   rV   r   r   r   r   r%   r5   r   itemsr   )kvs   00r3   <module>r     s'   "   # %    @ @    & 6 , : ;  			8	$ ++//8
9    9 J? JZ
O
O#,
OAQ
O
O  NOUUWW1qU" W
 s   D 