
    ԋg\                    T   d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZmZ d dlZd dlmZ d d	lmZ d d
lmZ d dlmZ erd dlmZm Z  d dlm!Z" d dl#m$Z$ d dl%m&Z&m'Z' dZ( G d de      Z)edd       Z* G d d      Z+ G d d      Z, G d d      Z-y)    )annotationsN)defaultdict)HashableIterableIteratorMapping)contextmanager)islice)TYPE_CHECKINGAny	TypedDict)Key)sum_mappings)ffill)time)	SchedulerWorker)	scheduler)
SourceCode)	TaskGroupTaskStateState)executep2pc                      e Zd ZU ded<   y)SpanMetadataz
list[dict]collectionsN)__name__
__module____qualname____annotations__     1lib/python3.12/site-packages/distributed/spans.pyr   r       s    r"   r   c               '  *  K   | st        d      t        j                         j                  d      }|r|d   nd}|r|d   nd}t	        d | D              }t        j
                  || z   ||z   d      5  |d	    d
d
d
       y
# 1 sw Y   y
xY ww)a!  Tag group of tasks to be part of a certain group, called a span.

    This context manager can be nested, thus creating sub-spans. If you close and
    re-open a span context manager with the same tag, you'll end up with two separate
    spans.

    Every cluster defines a global "default" span when no span has been defined by the
    client; the default span is automatically closed and reopened when all tasks
    associated to it have been completed; in other words the cluster is idle save for
    tasks that are explicitly annotated by a span. Note that, in some edge cases, you
    may end up with overlapping default spans, e.g. if a worker crashes and all unique
    tasks that were in memory on it need to be recomputed.

    You may capture the ID of a span on the client to match it with the
    :class:`~distributed.spans.Span` objects the scheduler:

    >>> client = Client()
    >>> with span("my workflow") as span_id:
    ...     client.submit(lambda: "Hello world!").result()
    >>> client.cluster.scheduler.extensions["spans"].spans[span_id]
    Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>

    Notes
    -----
    You may retrieve the current span with ``dask.get_annotations().get("span")``.
    You can do so in the client code as well as from inside a task.
    z"Must specify at least one span tagspannamer!   idsc              3  V   K   | ]!  }t        t        j                                # y wN)struuiduuid4).0_s     r#   	<genexpr>zspan.<locals>.<genexpr>J   s     04aDJJL!4s   ')r&   r'   )r%   N)
ValueErrordaskget_annotationsgettupleannotate)tags
annotation	prev_tagsprev_idsr'   s        r#   r%   r%   $   s     : =>>%%'++F3J&0
6"bI %/z% BH
040
0C	Y%5hnM	N"g 
O	N	Ns   A4B6B>	BBBc                     e Zd ZU ded<   ded<   ded<   ded<   d	ed
<   ded<   ded<   ded<   ded<   ded<   ded<   ded<    ee      Z e       Zded<   	 	 	 	 	 	 	 	 d2dZd3dZ	e
d4d       Zd5dZe
d6d       Zd7d Zd8d!Ze
d9d"       Ze
d9d#       Ze
d:d$       Ze
d;d%       Ze
d<d&       Ze
d=d'       Ze
d9d(       Ze
d>d)       Ze
d?d*       Ze
d@d+       ZedAd,       ZdBd-ZdCd.Ze
dDd/       Ze
d9d0       Zy1)ESpantuple[str, ...]r&   r*   idzweakref.ref[Span] | None_parent
list[Span]childrenzset[TaskGroup]groupsfloatenqueuedz"dict[tuple[SourceCode, ...], None]_codeSpanMetadata | None	_metadataz(defaultdict[tuple[Hashable, ...], float]_cumulative_worker_metricslist[tuple[float, int]]_total_nthreads_historyint_total_nthreads_offsetr   __weakref__zset[int]_metadata_seenc                >   || _         || _        |t        j                  |      nd | _        t               | _        g | _        t               | _	        i | _
        d | _        t        t              | _        t        |      dkD  sJ || _        t        |      dz
  | _        y )Nr      )r&   r?   weakrefrefr@   r   rE   rB   setrC   rF   rH   r   rL   rI   lenrK   rM   )selfr&   id_parenttotal_nthreads_historys        r#   __init__zSpan.__init__   s     	.4.@w{{6*de
 +6c*:')*Q...'=$&)*@&AA&E#r"   c                <    d| j                    d| j                   dS )Nz
Span<name=z, id=>)r&   r?   rV   s    r#   __repr__zSpan.__repr__   s    DII;eDGG9A66r"   c                H    | j                   r| j                         }|sJ |S y r)   )r@   rV   outs     r#   rX   zSpan.parent   s#    <<,,.CJ3Jr"   c                    t        |      }|| j                  v ry| j                  j                  |       | j                  t	        j
                  |      | _        y| j                  d   j                  |d          y)z,Add metadata to the span, e.g. code snippetsNr   )r?   rO   addrH   copydeepcopyextend)rV   metadatarW   s      r#   add_metadatazSpan.add_metadata   sf    l$%%%$>>!!]]84DNNN=)00-1HIr"   c                    | j                   dk(  ryg }| }|r*|j                  |j                         |j                  }|r*| j                   t	        t        |            dS )zRebuild the dask graph annotation which contains the full id history

        Note that this may not match the original annotation in case of TaskGroup
        collision.
        defaultNr0   )r&   appendr?   rX   r6   reversed)rV   r'   nodes      r#   r9   zSpan.annotation   sX     99$ JJtww;;D  		%*>??r"   c              #  j   K   |  | j                   D ]  }|j                         E d{     y7 w)zgTop-down recursion of all spans belonging to this branch off span tree,
        including self
        N)rB   traverse_spans)rV   childs     r#   rp   zSpan.traverse_spans   s0      
]]E++--- #-s   '313c              #  b   K   | j                         D ]  }|j                  E d{     y7 w)z4All TaskGroups belonging to this branch of span treeN)rp   rC   )rV   r%   s     r#   traverse_groupszSpan.traverse_groups   s)     '')D{{"" *"s   #/-/c                z    t        d | j                         D        d      }|rt        || j                        }|S )a  Earliest time when a task belonging to this span tree started computing;
        0 if no task has *finished* computing yet.

        Notes
        -----
        This is not updated until at least one task has *finished* computing.
        It could move backwards as tasks complete.

        See Also
        --------
        enqueued
        stop
        distributed.scheduler.TaskGroup.start
        c              3  T   K   | ]   }|j                   d k7  s|j                    " yw)        N)startr-   tgs     r#   r/   zSpan.start.<locals>.<genexpr>   s!     J 6""((c/RXX 6s   ((rv   rj   )minrs   maxrE   r`   s     r#   rw   z
Span.start   s<      J 4 4 6J
 c4==)C
r"   c                    | j                   r!t        d | j                         D              }n
t               }t        || j                        S )at  When this span tree finished computing, or current timestamp if it didn't
        finish yet.

        Notes
        -----
        This differs from ``TaskGroup.stop`` when there aren't unfinished tasks; is also
        will never be zero.

        See Also
        --------
        enqueued
        start
        done
        distributed.scheduler.TaskGroup.stop
        c              3  4   K   | ]  }|j                     y wr)   )stoprx   s     r#   r/   zSpan.stop.<locals>.<genexpr>   s     ?(>"bgg(>   )doner{   rs   r   rE   r`   s     r#   r~   z	Span.stop   s>    " 99?(<(<(>??C&C 3&&r"   c                    | j                   S r)   )rH   r]   s    r#   rg   zSpan.metadata  s    ~~r"   c                B    t        d | j                         D              S )zThe number of tasks currently in each state in this span tree;
        e.g. ``{"memory": 10, "processing": 3, "released": 4, ...}``.

        See Also
        --------
        distributed.scheduler.TaskGroup.states
        c              3  4   K   | ]  }|j                     y wr)   )statesrx   s     r#   r/   zSpan.states.<locals>.<genexpr>  s     G0F"BII0Fr   r   rs   r]   s    r#   r   zSpan.states  s     G0D0D0FGGGr"   c                B    t        d | j                         D              S )a  Return True if all tasks in this span tree are completed; False otherwise.

        Notes
        -----
        This property may transition from True to False, e.g. when a new sub-span is
        added or when a worker that contained the only replica of a task in memory
        crashes and the task need to be recomputed.

        See Also
        --------
        distributed.scheduler.TaskGroup.done
        c              3  4   K   | ]  }|j                     y wr)   )r   rx   s     r#   r/   zSpan.done.<locals>.<genexpr>$  s     <%;r277%;r   )allrs   r]   s    r#   r   z	Span.done  s     <T%9%9%;<<<r"   c                B    t        d | j                         D              S )zCumulative duration of all completed actions in this span tree, by action

        See Also
        --------
        duration
        distributed.scheduler.TaskGroup.all_durations
        c              3  4   K   | ]  }|j                     y wr)   )all_durationsrx   s     r#   r/   z%Span.all_durations.<locals>.<genexpr>/  s     N7MB,,7Mr   r   r]   s    r#   r   zSpan.all_durations&  s     Nt7K7K7MNNNr"   c                B    t        d | j                         D              S )zThe total amount of time spent on all tasks in this span tree

        See Also
        --------
        all_durations
        distributed.scheduler.TaskGroup.duration
        c              3  4   K   | ]  }|j                     y wr)   )durationrx   s     r#   r/   z Span.duration.<locals>.<genexpr>:  s     @)?22;;)?r   sumrs   r]   s    r#   r   zSpan.duration1  s     @)=)=)?@@@r"   c                B    t        d | j                         D              S )zThe total number of bytes that this span tree has produced

        See Also
        --------
        distributed.scheduler.TaskGroup.nbytes_total
        c              3  4   K   | ]  }|j                     y wr)   )nbytes_totalrx   s     r#   r/   z$Span.nbytes_total.<locals>.<genexpr>D  s     D-Cr2??-Cr   r   r]   s    r#   r   zSpan.nbytes_total<  s     DT-A-A-CDDDr"   c                h    t        t        j                  d | j                         D                    S )zCode snippets, sent by the client on compute(), persist(), and submit().

        Only populated if ``distributed.diagnostics.computations.nframes`` is non-zero.
        c              3  B   K   | ]  }|j                   D ]  }|   y wr)   )rF   )r-   rq   scs      r#   r/   zSpan.code.<locals>.<genexpr>N  s     V*?%++B"+"*?s   )listdictfromkeysrp   r]   s    r#   codez	Span.codeF  s,     MMV$*=*=*?VV
 	
r"   c                    t        d | j                         D              }t        d |j                         D              }t	        d| j
                  |z
        }||d<   |S )a  Replica of ``Worker.digests_total`` and
        ``Scheduler.cumulative_worker_metrics``, but only for the metrics that can be
        attributed to the current span tree. The span id has been removed from the key.

        At the moment of writing, all keys are
        ``("execute", <task prefix>, <activity>, <unit>)``
        or
        ``("p2p", <where>, <activity>, <unit>)``
        but more may be added in the future with a different format; please test e.g.
        for ``k[0] == "execute"``.
        c              3  4   K   | ]  }|j                     y wr)   )rI   r-   rq   s     r#   r/   z1Span.cumulative_worker_metrics.<locals>.<genexpr>^  s      
:OE,,:Or   c              3  J   K   | ]  \  }}|d    dk(  s|d   dk(  s|  yw)r   r   r1   secondsNr!   )r-   kvs      r#   r/   z1Span.cumulative_worker_metrics.<locals>.<genexpr>a  s.      
%$!Q1):qu	?QA+s   ###rv   )r   zN/Azidle or other spansr   )r   rp   r   itemsr{   active_cpu_seconds)rV   ra   known_secondsunknown_secondss       r#   cumulative_worker_metricszSpan.cumulative_worker_metricsQ  sl      
:>:M:M:O
 
  
))+
 

 c4#:#:]#JKBQ>?
r"   c                     | st        d      t        ddd| d   j                        }t        d | D              |_        |j
                  j                  |        t        d | D              |_        |S )	zpMerge multiple spans into a synthetic one.
        The input spans must not be related with each other.
        zNothing to merge)(merged)r   Nr   r&   rW   rX   rY   c              3  4   K   | ]  }|j                     y wr)   )rM   r   s     r#   r/   zSpan.merge.<locals>.<genexpr>x  s      )
6;UE((er   c              3  4   K   | ]  }|j                     y wr)   )rE   r   s     r#   r/   zSpan.merge.<locals>.<genexpr>|  s     =ue5>>ur   )r2   r=   rK   rz   rM   rB   rf   rE   )r   ra   s     r#   mergez
Span.mergek  sz    
 /00#(8#C#C	
 &) )
6;)
 &
" 	E"=u==
r"   c              #     K   | j                   r| j                  nd}t        | j                  | j                  d      D ](  \  }}|r||k\  r yt        | j                  |      |f * yw)zEYield (timestamp, number of threads across the cluster), forward-fillr   N)r   r~   r
   rK   rM   r{   rE   )rV   r~   tns       r#   _nthreads_timeserieszSpan._nthreads_timeseries  sa      IItyy1(($*E*Et
DAq T	dmmQ'**
s   A(A*c              #  H  K   | j                   dk7  r!| j                  df | j                  df yg }| j                  D ]!  }||j                  df|j                  dfgz  }# |j	                  d        d	}|D ]"  \  }}|s|dk(  sJ |df ||z  }|r|df $ yw)
zIf this span is the output of :meth:`merge`, yield
        (timestamp, True if at least one input span is active), forward-fill.
        r   TFNrQ   r1   c                    | d   S Nr   r!   )els    r#   <lambda>z)Span._active_timeseries.<locals>.<lambda>  s    2a5r"   )keyr   )r?   rE   r~   rB   sort)rV   eventsrq   n_activer   deltas         r#   _active_timeserieszSpan._active_timeseries  s      77j --%%))U""]]E*UZZ,<==F #
 	()HAuz!zgHh s   BB"	B"c           
     .   t        | j                          \  }}t        | j                          \  }}t        h ||      }t	        |||d      }t	        |||d      }t        ||dd ||      D 	
cg c]  \  }}	}
}|r||	|
f c}}
}	}S c c}}
}	}w )a  
        Returns
        -------
        List of tuples:

        - begin timestamp
        - end timestamp
        - Scheduler.total_nthreads during this interval

        When the Span is the output of :meth:`merge`, the intervals may not be
        contiguous.

        See Also
        --------
        enqueued
        stop
        active_cpu_seconds
        distributed.scheduler.SchedulerState.total_nthreads
        r   )leftFrQ   N)zipr   r   sortedr   )rV   
nthreads_tnthreads_countis_active_tis_active_flagt_interpnthreads_count_interpis_active_flag_interpt0t1r   actives               r#   nthreads_intervalszSpan.nthreads_intervals  s    * &)$*C*C*E%F"
N&)4+B+B+D&E#^5J556 %h
NQR S %h^RW X &)(12,(=?T&
&!B6 	 QK&
 	
 
s   6B
c                :    t        d | j                  D              S )a   Return number of CPU seconds that were made available on the cluster while
        this Span was running; in other words
        ``(Span.stop - Span.enqueued) * Scheduler.total_nthreads``.

        This accounts for workers joining and leaving the cluster while this Span was
        active. If this Span is the output of :meth:`merge`, do not count gaps between
        input spans.

        See Also
        --------
        enqueued
        stop
        nthreads_intervals
        distributed.scheduler.SchedulerState.total_nthreads
        c              3  4   K   | ]  \  }}}||z
  |z    y wr)   r!   )r-   r   r   nthreadss       r#   r/   z*Span.active_cpu_seconds.<locals>.<genexpr>  s#     X@W,<BHBGx'@Wr   )r   r   r]   s    r#   r   zSpan.active_cpu_seconds  s    " X@W@WXXXr"   N)r&   r>   rW   r*   rX   Span | NonerY   rJ   )returnr*   )r   r   )rg   r   r   None)r   z!dict[str, tuple[str, ...]] | None)r   Iterator[Span])r   zIterator[TaskGroup])r   rD   )r   rG   )r   zdict[TaskStateState, int])r   bool)r   zdict[str, float])r   rL   )r   zlist[tuple[SourceCode, ...]]r   !dict[tuple[Hashable, ...], float])r   r=   r   r=   )r   zIterator[tuple[float, int]])r   zIterator[tuple[float, bool]])r   zlist[tuple[float, float, int]]) r   r   r   r    r6   	__slots__rT   rO   rZ   r^   propertyrX   rh   r9   rp   rs   rw   r~   rg   r   r   r   r   r   r   r   staticmethodr   r   r   r   r   r!   r"   r#   r=   r=   O   s     	G%%    O .-"" HH 54 o&I"uNH$FF F 	F
 !8F.7  	J @ @.#
  0 ' '0   H H = = O O A A E E 
 
  2  &+4 
 
B Y Yr"   r=   c                      e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ddZ	 	 	 	 	 	 	 	 ddZddZddZddZ	ddZ
ddZ	 	 	 	 	 	 ddZy)SpansSchedulerExtensionz%Scheduler extension for spans supportr   r   zdict[str, Span]spansrA   
root_spansz(defaultdict[tuple[str, ...], list[Span]]spans_search_by_namezdefaultdict[str, list[Span]]spans_search_by_tagc                ~    || _         i | _        g | _        t        t              | _        t        t              | _        y r)   )r   r   r   r   r   r   r   )rV   r   s     r#   rZ   z SpansSchedulerExtension.__init__  s1    "
$/$5!#.t#4 r"   c                   i }d}|D ]4  }|j                   t               |_         |j                  }|j                  r| j                  |j                     }nv|j                   j                  d      }	|	r| j                  |	d   |	d         }n|s| j                         }|}|j                  |_        |j                  j                  |       |rd|j                  |<   |r|j                  |       |j                  x}	r |	x|j                   d<   ||j                  <   |j                   j                  dd       7 |S )a  Acknowledge the existence of runnable tasks on the scheduler. These may
        either be new tasks, tasks that were previously unrunnable, or tasks that were
        already fed into this method already.

        Attach newly observed tasks to either the desired span or to ("default", ).
        Update TaskGroup.span_id and wipe TaskState.annotations["span"].

        Returns
        -------
        Updated 'span' annotations: {key: {"name": (..., ...), "ids": (..., ...)}}
        Nr%   r&   r'   )r   r   groupspan_idr   r5   _ensure_span_ensure_default_spanr?   rC   rc   rF   rh   r9   r   pop)
rV   tssr   span_metadatara   default_spantsry   r%   anns
             r#   observe_tasksz%SpansSchedulerExtension.observe_tasks  s   " B~~%!% Bzzzz"**-nn((0,,S[#e*ED''+'@'@'B'D!WW
##'

4 !!-0 oo%s%7::v&RVV""640? B 
r"   c                    | j                   d   }|r|d   j                  s|d   S | j                  dt        t	        j
                               f      S )a  Return the currently active default span, or create one if the previous one
        terminated. In other words, do not reuse the previous default span if all tasks
        that were not explicitly annotated with :func:`spans` on the client side are
        finished.
        rj   r1   )r   r   r   r*   r+   r,   )rV   defaultss     r#   r   z,SpansSchedulerExtension._ensure_default_span1  sN     ,,Z8HRL--B<  DJJL0A/CDDr"   c                |   	 | j                   |d      S # t        $ r Y nw xY wt        |      t        |      k(  sJ t        |      dkD  sJ d}t        dt        |            D ]  }| j	                  |d| |d|       } t        ||d   || j                  j                        }|| j                   |j                  <   | j                  |   j                  |       |D ]   }| j                  |   j                  |       " |r|j                  j                  |       |S | j                  j                  |       |S )z-Create Span if it doesn't exist and return itr1   r   NrQ   r   )r   KeyErrorrU   ranger   r=   r   rY   r?   r   rl   r   rB   r   )rV   r&   r'   rX   ir%   tags          r#   r   z$SpansSchedulerExtension._ensure_span<  s7   	::c"g&& 		 4yCH$$$4y1}}q#d)$A&&tBQxRa9F % B#'>>#H#H	
 #

477!!$'..t4C$$S)006 OO""4(  OO""4(s    	  c              '  Z  K   t        t              }|D ]=  }| j                  |   D ])  }|t        |j                           j                  |       + ? t               }t        |j                               D ]0  \  }}|j                  |       |D ]  }|j                  |vs|  2 yw)zYield all spans that contain any of the given tags.
        When a tag is shared both by a span and its (grand)children, only return the
        parent.
        N)r   r   r   rU   r&   rl   rT   r   r   updaterX   )rV   r8   by_levelr   spseenr.   levels           r#   find_by_tagsz$SpansSchedulerExtension.find_by_tags[  s     
 t$C..s3RWW&--b1 4  ux~~/0HAuKK99D(H  1s   BB+"	B+c                :    t        j                  | j                   S )z5Return a synthetic Span which is the sum of all spans)r=   r   r   r]   s    r#   	merge_allz!SpansSchedulerExtension.merge_alll  s    zz4??++r"   c                @    t        j                   | j                  |  S )z`Return a synthetic Span which is the sum of all spans containing the given
        tags
        )r=   r   r   )rV   r8   s     r#   merge_by_tagsz%SpansSchedulerExtension.merge_by_tagsp  s!     zz,4,,d344r"   c                    |j                         D ]D  \  ^}}}}t        |t              sJ | j                  |   }|j                  |g|xx   |z  cc<   F y)a  Triggered by :meth:`SpansWorkerExtension.heartbeat`.

        Populate :meth:`Span.cumulative_worker_metrics` with data from the worker.

        See Also
        --------
        SpansWorkerExtension.heartbeat
        Span.cumulative_worker_metrics
        N)r   
isinstancer*   r   rI   )rV   wsdatacontextr   otherr   r%   s           r#   	heartbeatz!SpansSchedulerExtension.heartbeatv  s\     .2ZZ\)&Wggs+++::g&D++W,=u,=>!C> .:r"   N)r   r   )r   z$Iterable[scheduler_module.TaskState]r   ztuple[SourceCode, ...]r   r   r   zdict[Key, dict])r   r=   )r&   r>   r'   r>   r   r=   )r8   r*   r   r   )r8   r*   r   r=   )r   zscheduler_module.WorkerStater   r   r   r   )r   r   r   __doc__r    rZ   r   r   r   r   r   r   r  r!   r"   r#   r   r     s    /   CB 655515 %5 $	5
 
5n	E>",5D.D6WD	Dr"   r   c                  F    e Zd ZU dZded<   ded<   d
dZ	 	 	 	 ddZddZy	)SpansWorkerExtensionz"Worker extension for spans supportr   workerr   digests_total_since_heartbeatc                     || _         i | _        y r)   )r  r  )rV   r  s     r#   rZ   zSpansWorkerExtension.__init__  s    -/*r"   c                    | j                   rJ |j                         D ci c]#  \  }}t        |t              r|d   t        v r||% c}}| _         y c c}}w r   )r  r   r   r6   CONTEXTS_WITH_SPAN_ID)rV   r  r   r   s       r#   collect_digestsz$SpansWorkerExtension.collect_digests  sa    
 5555 6;;=.
=1!U#!0E(E qD=.
* .
s   (Ac                ,    | j                   }i | _         |S )aT  Apportion the metrics that do have a span to the Spans on the scheduler

        Returns
        -------
        ``{(context, span_id, prefix, activity, unit): value}}``

        See Also
        --------
        SpansSchedulerExtension.heartbeat
        Span.cumulative_worker_metrics
        distributed.worker.Worker.get_metrics
        )r  r`   s     r#   r  zSpansWorkerExtension.heartbeat  s     00-/*
r"   N)r  r   )r  zMapping[Hashable, float]r   r   r   )r   r   r   r  r    rZ   r  r  r!   r"   r#   r  r    s/    ,N#DD0

-E

	

r"   r  )r8   r*   r   zIterator[str]).
__future__r   rd   r+   rR   r   r   collections.abcr   r   r   r   
contextlibr	   	itertoolsr
   typingr   r   r   dask.configr3   dask.typingr   distributed.collectionsr   distributed.itertoolsr   distributed.metricsr   distributedr   r   r   scheduler_moduledistributed.clientr   distributed.schedulerr   r   r  r   r%   r=   r   r  r!   r"   r#   <module>r     s    "    # A A %  0 0   0 ' $ .9-? + 9  ' 'TGY GYTlD lD^% %r"   