
    ∋d.                        d dl mZ d dlZd dlmZ d dlmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ  ej        e          Z G d	 d
e          ZdS )    )annotationsN)isawaitable)IOLoop)parse_timedelta)AdaptiveCore)pickle)
log_errorsc                       e Zd ZdZ	 	 	 	 	 	 	 d fd	Zed             Zed             Zed             Zed             Z	d Z
d fdZddZed             Zd Zedd            Z xZS )Adaptivea	  
    Adaptively allocate workers based on scheduler load.  A superclass.

    Contains logic to dynamically resize a Dask cluster based on current use.
    This class needs to be paired with a system that can create and destroy
    Dask workers using a cluster resource manager.  Typically it is built into
    already existing solutions, rather than used directly by users.
    It is most commonly used from the ``.adapt(...)`` method of various Dask
    cluster classes.

    Parameters
    ----------
    cluster: object
        Must have scale and scale_down methods/coroutines
    interval : timedelta or str, default "1000 ms"
        Milliseconds between checks
    wait_count: int, default 3
        Number of consecutive times that a worker should be suggested for
        removal before we remove it.
    target_duration: timedelta or str, default "5s"
        Amount of time we want a computation to take.
        This affects how aggressively we scale up.
    worker_key: Callable[WorkerState]
        Function to group workers together when scaling down
        See Scheduler.workers_to_close for more information
    minimum: int
        Minimum number of workers to keep around
    maximum: int
        Maximum number of workers to keep around
    **kwargs:
        Extra parameters to pass to Scheduler.workers_to_close

    Examples
    --------

    This is commonly used from existing Dask classes, like KubeCluster

    >>> from dask_kubernetes import KubeCluster
    >>> cluster = KubeCluster()
    >>> cluster.adapt(minimum=10, maximum=100)

    Alternatively you can use it from your own Cluster class by subclassing
    from Dask's Cluster superclass

    >>> from distributed.deploy import Cluster
    >>> class MyCluster(Cluster):
    ...     def scale_up(self, n):
    ...         """ Bring worker count up to n """
    ...     def scale_down(self, workers):
    ...        """ Remove worker addresses from cluster """

    >>> cluster = MyCluster()
    >>> cluster.adapt(minimum=10, maximum=100)

    Notes
    -----
    Subclasses can override :meth:`Adaptive.target` and
    :meth:`Adaptive.workers_to_close` to control when the cluster should be
    resized. The default implementation checks if there are too many tasks
    per worker or too little memory available (see
    :meth:`distributed.Scheduler.adaptive_target`).
    The values for interval, min, max, wait_count and target_duration can be
    specified in the dask config under the distributed.adaptive key.
    Nc                &   || _         || _        || _        |t          j                            d          }|t          j                            d          }|t          j                            d          }|t          j                            d          }|t          j                            d          }t          |          | _        t          	                    d||           t                                          ||||           d S )Nzdistributed.adaptive.intervalzdistributed.adaptive.minimumzdistributed.adaptive.maximumzdistributed.adaptive.wait-countz$distributed.adaptive.target-durationz/Adaptive scaling started: minimum=%s maximum=%s)minimummaximum
wait_countinterval)cluster
worker_key_workers_to_close_kwargsdaskconfiggetr   target_durationloggerinfosuper__init__)
selfr   r   r   r   r   r   r   kwargs	__class__s
            ;lib/python3.11/site-packages/distributed/deploy/adaptive.pyr   zAdaptive.__init__T   s     $(.%{'FGGH?koo&DEEG?koo&DEEG)JKKJ""koo.TUUO.??EwPWXXXWh 	 	
 	
 	
 	
 	
    c                    | j         j        S N)r   scheduler_commr   s    r   	schedulerzAdaptive.schedulerv   s    |**r    c                    | j         j        S r"   )r   planr$   s    r   r'   zAdaptive.planz   s    |  r    c                    | j         j        S r"   )r   	requestedr$   s    r   r)   zAdaptive.requested~   s    |%%r    c                    | j         j        S r"   )r   observedr$   s    r   r+   zAdaptive.observed   s    |$$r    c                R   K   | j                             | j                   d{V S )a_  
        Determine target number of workers that should exist.

        Notes
        -----
        ``Adaptive.target`` dispatches to Scheduler.adaptive_target(),
        but may be overridden in subclasses.

        Returns
        -------
        Target number of workers

        See Also
        --------
        Scheduler.adaptive_target
        )r   N)r%   adaptive_targetr   r$   s    r   targetzAdaptive.target   sK      " ^33 0 4 
 
 
 
 
 
 
 
 	
r    r.   intreturndictc                   K   t          | j                  t          | j                  k    r| j         d {V  t	                                          |           d {V S r"   )lenr'   r)   r   r   recommendations)r   r.   r   s     r   r4   zAdaptive.recommendations   sj      ty>>S0000 ,WW,,V444444444r    	list[str]c                   K    | j         j        d|| j        rt          j        | j                  nddd| j         d{V S )a  
        Determine which, if any, workers should potentially be removed from
        the cluster.

        Notes
        -----
        ``Adaptive.workers_to_close`` dispatches to Scheduler.workers_to_close(),
        but may be overridden in subclasses.

        Returns
        -------
        List of worker names to close, if any

        See Also
        --------
        Scheduler.workers_to_close
        Nname)r.   key	attribute )r%   workers_to_closer   r   dumpsr   )r   r.   s     r   r;   zAdaptive.workers_to_close   sw      $ 5T^4 
15JT_---d
 
 +	
 
 
 
 
 
 
 
 	
r    c                   K   |sd S t                               d|           | j                            |dd           d {V  | j                            |          }t          |          r
| d {V  d S d S )NzRetiring workers %sT)namesremoveclose_workers)r   r   r%   retire_workersr   
scale_downr   )r   workersfs      r   rB   zAdaptive.scale_down   s       	F)7333n++ , 
 
 	
 	
 	
 	
 	
 	
 	
 L##G,,q>> 	GGGGGGGGG	 	r    c                p   K   | j                             |          }t          |          r
| d {V  d S d S r"   )r   scaler   )r   nrD   s      r   scale_upzAdaptive.scale_up   sH      Lq!!q>> 	GGGGGGGGG	 	r    r   c                N    | j         r| j         j        S t          j                    S )zOverride Adaptive.loop)r   loopr   currentr$   s    r   rJ   zAdaptive.loop   s'     < 	$<$$>###r    )NNNNNNN)r.   r/   r0   r1   )r.   r/   r0   r5   )r0   r   )__name__
__module____qualname____doc__r   propertyr%   r'   r)   r+   r.   r4   r;   r	   rB   rH   rJ   __classcell__)r   s   @r   r   r      sW       ? ?F  
  
  
  
  
  
D + + X+ ! ! X! & & X& % % X%
 
 
*5 5 5 5 5 5
 
 
 
2   Z"  
 $ $ $ X$ $ $ $ $r    r   )
__future__r   logginginspectr   tornado.ioloopr   dask.configr   
dask.utilsr    distributed.deploy.adaptive_corer   distributed.protocolr   distributed.utilsr	   	getLoggerrL   r   r   r:   r    r   <module>r\      s    " " " " " "        ! ! ! ! ! !     & & & & & & 9 9 9 9 9 9 ' ' ' ' ' ' ( ( ( ( ( (		8	$	$G$ G$ G$ G$ G$| G$ G$ G$ G$ G$r    