
    ԋg8                       d dl mZ d dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d dlZd d
lmZ  ej6                  dd      ZefdZ ed      Z ed      Z G d d      Z erLejB                  dk  r= e ej                  d      j                  Z e ejD                  d      j                  Z"nej                  ZejD                  Z"ejF                  Z#	 ejH                  Z$e G d d             Z'eejP                  df	 	 	 	 	 dd       Z) G d d      Z* e*       Z+ G d d      Z,y# e%e&f$ r e#Z$Y Qw xY w)    )annotationsN)CallableHashableIterator)contextmanager)
ContextVar)	dataclass)wraps)nan)Literal)WINDOWS_empty_namedtuple c                R    t        t        |       t              fd       }|S )zo
    Return a function calling the given psutil *method_name*,
    or returning *default* if psutil fails.
    c                 @    	         S # t         $ r
          cY S w xY wN)RuntimeError)defaultmeths   3lib/python3.12/site-packages/distributed/metrics.pywrapperz_psutil_caller.<locals>.wrapper   s&    	6M 	9	s   
 )getattrpsutilr
   )method_namer   r   r   s    ` @r   _psutil_callerr      s.    
 6;'D
4[  N    disk_io_countersnet_io_countersc                  j    e Zd ZU dZded<   ded<   ded<   ded<   ded	<   	 d	 	 	 	 	 dd
ZddZddZy)_WindowsTimezxCombine time.time() or time.monotonic() with time.perf_counter() to get an
    absolute clock with fine resolution.
    Callable[[], float]
base_timerfloatdeltazfloat | Nonepreviousnext_resyncresync_everyc                h    || _         |rt        d      nd | _        t        d      | _        || _        y )Nz-inf)r"   r#   r%   r&   r'   )selfbaseis_monotonicr'   s       r   __init__z_WindowsTime.__init__9   s.     )5f4 =(r   c                   t        j                         }|| j                  kD  r$| j                          || j                  z   | _        || j
                  z  }| j                  %|| j                  k  r| j                  dz   }|| _        |S )Ng&.>)timemodperf_counterr&   resyncr'   r$   r%   )r)   curs     r   timez_WindowsTime.timeA   sx    ""$!!!KKM"T%6%66Dtzz==$dmm#mmd*DM
r   c                   | j                   }t        j                  }d}	 t        |dz        D cg c]  } |        |       f }}t	        j
                  d |D              }|j                         d   \  }}||k  r`|D 	cg c]  }	|	d   |k(  s|	d    c}	d d }
t        |
      |dz
  k\  sJ |
       |t        |
      t        |
      z  z
  | _	        y c c}w c c}	w )N      c              3  &   K   | ]	  }|d      yw)r   Nr   ).0ts     r   	<genexpr>z&_WindowsTime.resync.<locals>.<genexpr>T   s     +@%QAaD%s   r      )
r"   r.   r/   rangecollectionsCountermost_commonlensumr$   )r)   _time_perf_countermin_samples_times	abs_timesfirstnfirstr8   
perf_timess              r   r0   z_WindowsTime.resyncN   s    ,,9>{Q9OP9OAeg}/9OEP#+++@%+@@I%113A6ME6#(-?11!A$?DJz?kAo5AzA5Z3z?!BBDJ Q @s   CCCN)g     @)r*   r!   r+   boolr'   r#   )returnr#   )rL   None)__name__
__module____qualname____doc____annotations__r,   r2   r0   r   r   r   r    r    .   sT     $#L TY)')7;)KP)r   r    )      F)r+   Tc                  <    e Zd ZU ded<   ded<   ded<    ee      Zy)MeterOutputr#   startstopr$   N)rN   rO   rP   rR   tuple	__slots__r   r   r   rV   rV   t   s    L
KLo&Ir   rV           c              #  |  K   t         |        t        t              }	 |  |        |_        |j                  |j                  z
  |_        |durt        ||j                        |_        yy#  |        |_        |j                  |j                  z
  |_        |durt        ||j                        |_        w w xY ww)a1  Convenience context manager which calls func() before and after the wrapped
    code and calculates the delta.

    Parameters
    ----------
    label: str
        label to pass to the callback
    func: callable
        function to call before and after, which must return a number.
        Besides time, it could return e.g. cumulative network traffic or disk usage.
        Default: :func:`timemod.perf_counter`
    floor: float or False, optional
        Floor the delta to the given value (default: 0). This is useful for strictly
        cumulative functions that can occasionally glitch and go backwards.
        Set to False to disable.

    Yields
    ------
    :class:`MeterOutput` where the ``start`` attribute is populated straight away, while
    ``stop`` and ``delta`` are nan until context exit.
    FN)rV   r   rX   rW   r$   max)funcfloorouts      r   meterra   |   s     4 dfc3
'C.	6HHsyy(	E399-CI  6HHsyy(	E399-CI s   B<A- AB<-AB99B<c                      e Zd ZU dZded<   d Zd Zed        Ze	ddd		 	 	 	 	 	 	 dd
       Z
e	dd       ZddZe	dej                  df	 	 	 	 	 	 	 	 	 dd       Zy)ContextMetera  Context-based general purpose meter.

    Usage
    -----
    1. In high level code, call :meth:`add_callback` to install a hook that defines an
       activity
    2. In low level code, typically many stack levels below, log quantitative events
       (e.g. elapsed time, transferred bytes, etc.) so that they will be attributed to
       the high-level code calling it, either with :meth:`meter`,
       :meth:`meter_function`, or :meth:`digest_metric`.

    Examples
    --------
    In the code that e.g. sends a Python object from A to B over the network:
    >>> from distributed.metrics import context_meter
    >>> with context_meter.add_callback(partial(print, "A->B comms:")):
    ...     await send_over_the_network(obj)

    In the serialization utilities, called many stack levels below:
    >>> with context_meter.meter("dumps"):
    ...     pik = pickle.dumps(obj)
    >>> with context_meter.meter("compress"):
    ...     pik = lz4.compress(pik)

    And finally, elsewhere, deep into the TCP stack:
    >>> with context_meter.meter("network-write"):
    ...     await comm.write(frames)

    When you call the top-level code, you'll get::
      A->B comms: dumps 0.012 seconds
      A->B comms: compress 0.034 seconds
      A->B comms: network-write 0.567 seconds
    zBContextVar[dict[Hashable, Callable[[Hashable, float, str], None]]]
_callbacksc                B    t        dt        |        di       | _        y )NzMetricHook<z>._callbacks)r   )r   idrd   r)   s    r   r,   zContextMeter.__init__   s     $"T(<0"
r   c                <    | t         u sJ d       | j                  dfS )NzFound copy of singletonr   )context_meter_unpickle_singletonrg   s    r   
__reduce__zContextMeter.__reduce__   s'    }$?&??$''++r   c                     t         S r   )ri   r   r   r   rj   z ContextMeter._unpickle_singleton   s    r   NF)keyallow_offloadc             #    K   |r1t        j                         t        j                         dfd}n}|
t	               }| j
                  j                         }|j                         }|||<   | j
                  j                  |      }	 d |j                  j                  |       y# |j                  j                  |       w xY ww)a5  Add a callback when entering the context and remove it when exiting it.
        The callback must accept the same parameters as :meth:`digest_metric`.

        Parameters
        ----------
        callback: Callable
            ``f(label, value, unit)`` to be executed
        key: Hashable, optional
            Unique key for the callback. If two nested calls to ``add_callback`` use the
            same key, suppress the outermost callback.
        allow_offload: bool, optional
            If set to True, this context must be executed inside a running asyncio
            event loop. If a call to :meth:`digest_metric` is performed from a different
            thread, e.g. from inside :func:`distributed.utils.offload`, ensure that
            the callback is executed in the event loop's thread instead.
        c               r    t        j                         k(  r | ||       y j                  | ||       y r   )	threading	get_identcall_soon_threadsafe)labelvalueunitcallbacklooptids      r   safe_cbz*ContextMeter.add_callback.<locals>.safe_cb   s4    &&(C/UE40--hudKr   Nrt   r   ru   r#   rv   strrL   rM   )asyncioget_running_looprq   rr   objectrd   getcopysetvarreset)	r)   rw   rm   rn   rz   cbstokrx   ry   s	    `     @@r   add_callbackzContextMeter.add_callback   s     0 ++-D%%'CL L G;(Coo!!#hhjCoo!!#&	GGMM#CGGMM#s   BCB2 C2CCc              #     K   | j                   j                  i       }	 d |j                  j                  |       y# |j                  j                  |       w xY ww)z8Do not trigger any callbacks set outside of this contextN)rd   r   r   r   )r)   r   s     r   clear_callbackszContextMeter.clear_callbacks  sD      oo!!"%	GGMM#CGGMM#s   A? AAAc                v    | j                   j                         }|j                         D ]  } ||||        y)zaInvoke the currently set context callbacks for an arbitrary quantitative
        metric.
        N)rd   r   values)r)   rt   ru   rv   r   cbs         r   digest_metriczContextMeter.digest_metric  s2     oo!!#**,BueT" r   secondsr[   c              #    K   dk7  r9	 t        ||      5 }| ddd       | j                  |j                         yg dfd}	 | j                  |      5  t        |d      5 }| ddd       ddd       j                  t	              z
  }|durt        ||      }||_        | j                  ||       y# 1 sw Y   xY w# | j                  |j                         w xY w# 1 sw Y   xY w# 1 sw Y   xY w# j                  t	              z
  }|durt        ||      }||_        | j                  ||       w xY ww)a  Convenience context manager or decorator which calls func() before and after
        the wrapped code, calculates the delta, and finally calls :meth:`digest_metric`.

        If unit=='seconds', it also subtracts any other calls to :meth:`meter` or
        :meth:`digest_metric` with the same unit performed within the context, so that
        the total is strictly additive.

        Parameters
        ----------
        label: Hashable
            label to pass to the callback
        unit: str, optional
            unit to pass to the callback. Default: seconds
        func: callable
            see :func:`meter`
        floor: bool, optional
            see :func:`meter`

        Yields
        ------
        :class:`MeterOutput` where the ``start`` attribute is populated straight away,
        while ``stop`` and ``delta`` are nan until context exit. In case of multiple
        nested calls to :meth:`meter`, then delta (for seconds only) is reduced by the
        inner metrics, to a minimum of ``floor``.
        r   )r_   Nc                4    |k(  rj                  |       y y r   )append)label2value2unit2offsetsrv   s      r   rw   z$ContextMeter.meter.<locals>.callback?  s    } v& r   F)r   r   r   r#   r   r|   rL   rM   )ra   r   r$   r   rA   r]   )	r)   rt   rv   r^   r_   mrw   r$   r   s	     `     @r   ra   zContextMeter.meter  s5    B 994u-G . ""5!''48 	'	3""8,eD.F! /G, GGc'l*EE!E5)AGueT2/ .- ""5!''48 /G.F,, GGc'l*EE!E5)AGueT2s{   EC CC 'ED
 C>,C21C>9D
 AEC	C C//E2C;	7C>>DD
 
AEE)rw   z&Callable[[Hashable, float, str], None]rm   Hashable | Nonern   rK   rL   Iterator[None])rL   r   r{   )
rt   r   rv   r|   r^   r!   r_   float | Literal[False]rL   Iterator[MeterOutput])rN   rO   rP   rQ   rR   r,   rk   staticmethodrj   r   r   r   r   r.   r/   ra   r   r   r   rc   rc      s     D SR

,   
  $#-8- 	-
 - 
- -^  #  $+$8$8(+9393 93 "	93
 &93 
93 93r   rc   c                      e Zd ZU dZded<   ded<   ded<   ej                  fddZdd	Ze	d
ddd       Z
	 	 d	 	 	 	 	 ddZy
)DelayedMetricsLedgera  Add-on to :class:`ContextMeter` that helps in the case where:

    - The code to be metered is not easily expressed as a self-contained code block
      e.g. you want to measure latency in the asyncio event loop before and after
      running a task
    - You want to alter the metrics depending on how the code ends; e.g. you want to
      post them differently in case of failure.

    Examples
    --------
    >>> ledger = DelayedMetricsLedger()  # Metering starts here
    >>> async def wrapper():
    ...     with ledger.record():
    ...         return await metered_function()
    >>> task = asyncio.create_task(wrapper())
    >>> # (later, elsewhere)
    >>> try:
    ...     await task
    ...     coarse_time = False
    ... except Exception:
    ...     coarse_time = "failed"
    ...     raise
    ... finally:
    ...     # Metering stops here
    ...     for label, value, unit in ledger.finalize(coarse_time):
    ...         # actually log metrics
    r!   r^   r#   rW   z!list[tuple[Hashable, float, str]]metricsc                8    || _          |       | _        g | _        y r   )r^   rW   r   )r)   r^   s     r   r,   zDelayedMetricsLedger.__init__t  s    	V
r   c                @    | j                   j                  |||f       y r   )r   r   )r)   rt   ru   rv   s       r   	_callbackzDelayedMetricsLedger._callbacky  s    UE401r   Nrm   c             #     K   t         j                  | j                  |      5  d ddd       y# 1 sw Y   yxY ww)a  Ingest metrics logged with :meth:`ContextMeter.digest_metric` or
        :meth:`ContextMeter.meter` and temporarily store them in :ivar:`metrics`.

        Parameters
        ----------
        key: Hashable, optional
            See :meth:`ContextMeter.add_callback`
        r   N)ri   r   r   )r)   rm   s     r   recordzDelayedMetricsLedger.record|  s.      ''C'@ A@@s   ">2	>;>c              #     K   | j                         }|| j                  z
  }| j                  D ]"  \  }}}|dk7  s|s|||f |dk(  s|r||z  }$ |durt        ||      }|xs d|df yw)a4  The metered code is terminated, and we now know how to log it.

        Parameters
        ----------
        coarse_time: str | False, optional
            False
                Yield all acquired metrics, plus an extra time metric, labelled "other",
                which is the time between creating the DelayedMetricsLedger and
                calling this method, minus any time logged in the metrics.
            label
                Yield all acquired non-time metrics.
                Yield a single metric, labelled <coarse_time>, which is the time
                between creating the DelayedMetricsLedger and calling this method.
        floor: float | False, optional
            Floor either the "other" or the <coarse_time> metric to this value
             (default: 0). Set to False to disable.
        r   FotherN)r^   rW   r   r]   )r)   coarse_timer_   rX   r$   rt   ru   rv   s           r   finalizezDelayedMetricsLedger.finalize  s     , yy{tzz!"&,,E5$y UD((y 	 #/ u%E$WeY66s   AA/	A/#A/)r^   r!   r{   )rm   r   rL   r   )Fr[   )r   zstr | Literal[False]r_   r   rL   z%Iterator[tuple[Hashable, float, str]])rN   rO   rP   rQ   rR   r.   r/   r,   r   r   r   r   r   r   r   r   r   S  sm    8 L..3:3G3G 
2 /3 
 
 -2(+!7)!7 &!7 
/	!7r   r   )r^   r!   r_   r   rL   r   )-
__future__r   r}   r=   sysrq   r2   r.   collections.abcr   r   r   
contextlibr   contextvarsr   dataclassesr	   	functoolsr
   mathr   typingr   r   distributed.compatibilityr   
namedtupler   r   r   r   r    version_info	monotonicprocess_timethread_timeAttributeErrorOSErrorrV   r/   ra   rc   ri   r   r   r   r   <module>r      s|   "   
   8 8 % " !     -*K**+>C  ): $ ""45  !23/ /f s')59>>DW..TBGGI <<D!!I##%%K
 ' ' '  ' 4 4$' .
 .! .  .  .Fm3 m3` W7 W7G 	  Ks   8E
 
	EE