
    ԋg5                    V   d dl mZ d dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZmZmZ d dlZd dlmZ d d	lmZ d d
lmZmZ d dlmZ d dlm Z  d dl!m"Z"m#Z#  ejH                  e%      Z&e&jO                   e"d             e&jO                   e"d              G d de      Z( G d de      Z) G d dejT                  ee+f         Z,d!dZ- G d de.      Z/ G d de0      Z1 G d de.      Z2 G d dejf                        Z4 G d d ejj                  ee+e6f         Z7y)"    )annotationsN)defaultdict)CallableHashableIteratorMappingMutableMappingSized)contextmanager)partial)Literal
NamedTupleProtocolcast)Key)context_meter)deserialize_bytesserialize_bytelist)get_compression_settingssafe_sizeof)RateLimiterFilternbytesz#Spill file on disk reached capacityzSpill to disk failedc                  6    e Zd ZU dZded<   ded<   ddZddZy)	SpilledSizez7Size of a key/value pair when spilled to disk, in bytesintmemorydiskc                v    t        | j                  |j                  z   | j                  |j                  z         S Nr   r   r   selfothers     1lib/python3.12/site-packages/distributed/spill.py__add__zSpilledSize.__add__    )    4;;5tyy5::7MNN    c                v    t        | j                  |j                  z
  | j                  |j                  z
        S r    r!   r"   s     r%   __sub__zSpilledSize.__sub__#   r'   r(   N)r$   r   returnr   )__name__
__module____qualname____doc____annotations__r&   r*    r(   r%   r   r      s    A K
IOOr(   r   c                  *    e Zd ZdZedd       ZddZy)ManualEvictProtoaS  Duck-type API that a third-party alternative to SpillBuffer must respect (in
    addition to MutableMapping) if it wishes to support spilling when the
    ``distributed.worker.memory.spill`` threshold is surpassed.

    This is public API. At the moment of writing, Dask-CUDA implements this protocol in
    the ProxifyHostFile class.
    c                     y)zAccess to fast memory. This is normally a MutableMapping, but for the purpose
        of the manual eviction API it is just tested for emptiness to know if there is
        anything to evict.
        Nr1   r#   s    r%   fastzManualEvictProto.fast0   s     	r(   c                     y)a  Manually evict a key/value pair from fast to slow memory.
        Return size of the evicted value in fast memory.

        If the eviction failed for whatever reason, return -1. This method must
        guarantee that the key/value pair that caused the issue has been retained in
        fast memory and that the problem has been logged internally.

        This method never raises.
        Nr1   r5   s    r%   evictzManualEvictProto.evict8   s     	r(   N)r+   zSized | boolr+   r   )r,   r-   r.   r/   propertyr6   r8   r1   r(   r%   r3   r3   '   s       
r(   r3   c                       e Zd ZU dZded<   ded<   	 d	 	 	 	 	 d fdZedd       Zedd       Zd fd	Z	dd
Z
d fdZd fdZdddZedd       Zedd       Zedd       Zedd       Z xZS )SpillBuffera  MutableMapping that automatically spills out dask key/value pairs to disk when
    the total size of the stored data exceeds the target. If max_spill is provided the
    key/value pairs won't be spilled once this threshold has been reached.

    Parameters
    ----------
    spill_directory: str
        Location on disk to write the spill files to
    target: int
        Managed memory, in bytes, to start spilling at
    max_spill: int | False, optional
        Limit of number of bytes to be spilled on disk. Set to False to disable.
    zset[Key]logged_pickle_errorsz#defaultdict[tuple[str, str], float]cumulative_metricsc                    t        ||      }t        j                  |t        j                               }t        |   i ||t               t               | _        t        t              | _        y )N)r6   slownweight)SlowzictCacheWeakValueMappingsuper__init___in_memory_weightsetr=   r   floatr>   )r#   spill_directorytarget	max_spillr@   slow_cached	__class__s         r%   rH   zSpillBuffer.__init__X   sX     OY/jjt'<'<'>?b{fEVW$'E!"-e"4r(   c              #  v    K   d fd}t        j                  |      5  d ddd       y# 1 sw Y   yxY ww)as  Capture metrics re. disk read/write, serialize/deserialize, and
        compress/decompress.

        Note that this duplicates capturing from gather_dep, get_data, and execute. It
        is repeated here to make it possible to split serialize/deserialize and
        compress/decompress triggered by spill/unspill from those triggered by network
        comms.
        c                \    t        | t              sJ j                  | |fxx   |z  cc<   y r    )
isinstancestrr>   )labelvalueunitr#   s      r%   metrics_callbackz6SpillBuffer._capture_metrics.<locals>.metrics_callbackr   s-    eS)))##E4K0E90r(   N)rU   r   rV   rK   rW   rT   r+   None)r   add_callback)r#   rX   s   ` r%   _capture_metricszSpillBuffer._capture_metricsg   s+     	: ''(89 :99s   9-	969c              #    K   	 d  y # t         $ rR}|j                  \  }|| j                  v sJ || j                  vsJ t        j                  d       t               d }~wt        $ r" t        j                  dd       t               t        $ r}|j                  | j                  v sJ |j                  | j                  vsJ |j                  |k(  r|J | |=  |j                  | j                  vrGt        j                  d|j                  d       | j                  j                  |j                         t               d }~ww xY ww)Nz;Spill file on disk reached capacity; keeping data in memoryz,Spill to disk failed; keeping data in memoryT)exc_infozFailed to pickle %r)MaxSpillExceededargsr6   r@   loggerwarningHandledErrorOSErrorerrorPickleErrorkeyr=   add)r#   rf   ekey_es       r%   _handle_errorszSpillBuffer._handle_errorsy   s(    "	% 	!vvHUDII%%%		)))NNM .  	!LLGRVLW.  	%55DII%%%55		)))uu|&
 I 55 9 99LL!6LM--11!%%8"n$'	%s2   E	 E	EAA5EB4EEEc                R   	 | j                         5  | j                  |      5  t        |   ||       | j                  j                  |       ddd       ddd       y# 1 sw Y   xY w# 1 sw Y   yxY w# t        $ r# || j                  v sJ || j                  vsJ Y yw xY w)a  If sizeof(value) < target, write key/value pair to self.fast; this may in
        turn cause older keys to be spilled from fast to slow.
        If sizeof(value) >= target, write key/value pair directly to self.slow instead.

        Raises
        ------
        Exception
            sizeof(value) >= target, and value failed to pickle.
            The key/value pair has been forgotten.

        In all other cases:

        - an older value was evicted and failed to pickle,
        - this value or an older one caused the disk to fill and raise OSError,
        - this value or an older one caused the max_spill threshold to be exceeded,

        this method does not raise and guarantees that the key/value that caused the
        issue remained in fast.
        N)	r[   rj   rG   __setitem__r=   discardrb   r6   r@   )r#   rf   rV   rP   s      r%   rl   zSpillBuffer.__setitem__   s    (	(&&($*=*=c*B#C/))11#6 +C((*B*B((  	($))###dii'''	(sE   A: A.,A"A.A: "A+	'A..A73A: 7A: :)B&%B&c                *   	 | j                         5  | j                  d      5  | j                  j                         \  }}}t	        t
        |      cddd       cddd       S # 1 sw Y   nxY wddd       y# 1 sw Y   yxY w# t        $ r Y yw xY w)a  Implementation of :meth:`ManualEvictProto.evict`.

        Manually evict the oldest key/value pair, even if target has not been
        reached. Returns sizeof(value).
        If the eviction failed (value failed to pickle, disk full, or max_spill
        exceeded), return -1; the key/value pair that caused the issue will remain in
        fast. The exception has been logged internally.
        This method never raises.
        N)r[   rj   r6   r8   r   r   rb   )r#   _rB   s      r%   r8   zSpillBuffer.evict   ss    	&&($*=*=d*C#yy01fC( +D*C((*C*C(((  		sK   B A:.A%	A:	B %A.	*A:1B :B?B B 	BBc                4   | j                         5  || j                  v rUt        t        | j                  j                  |         }t        j                  ddd       t        j                  d|d       t        | !  |      cd d d        S # 1 sw Y   y xY w)Nzmemory-read   countbytes)	r[   r6   r   r   weightsr   digest_metricrG   __getitem__)r#   rf   memory_sizerP   s      r%   rw   zSpillBuffer.__getitem__   sw    ""$dii #3		(9(9#(>? ++M1gF++M;P7&s+ %$$s   A2BBc                Z    t         |   |       | j                  j                  |       y r    )rG   __delitem__r=   rm   r#   rf   rP   s     r%   rz   zSpillBuffer.__delitem__   s$    C !!))#.r(   c                    t        d      )NzAre you calling .pop(key, None) as a way to discard a key if it exists?It may cause data to be read back from disk! Please use `del` instead.)NotImplementedError)r#   rf   defaults      r%   popzSpillBuffer.pop   s    !U
 	
r(   c                    | j                   S )zxKey/value pairs stored in RAM. Alias of zict.Buffer.fast.
        For inspection only - do not modify directly!
        )r6   r5   s    r%   r   zSpillBuffer.memory       
 yyr(   c                    | j                   S )z~Key/value pairs spilled out to disk. Alias of zict.Buffer.slow.
        For inspection only - do not modify directly!
        )r@   r5   s    r%   r   zSpillBuffer.disk   r   r(   c                ~    t        t        j                  | j                        }t        t        |j
                        S r    )r   rD   rE   r@   rC   data)r#   caches     r%   _slow_uncachedzSpillBuffer._slow_uncached   s'    TZZ+D%**%%r(   c                .    | j                   j                  S )zNumber of bytes spilled to disk. Tuple of

        - output of sizeof()
        - pickled size

        The two may differ substantially, e.g. if sizeof() is inaccurate or in case of
        compression.
        )r   total_weightr5   s    r%   spilled_totalzSpillBuffer.spilled_total   s     ""///r(   F)rL   rT   rM   r   rN   int | Literal[False])r+   Iterator[None])rf   z
Key | Noner+   r   rf   r   rV   objectr+   rY   r9   rf   r   r+   r   rf   r   r+   rY   r    )rf   r   r~   r   r+   r   )r+   zMapping[Key, object])r+   rC   )r+   r   )r,   r-   r.   r/   r0   rH   r   r[   rj   rl   r8   rw   rz   r   r:   r   r   r   r   __classcell__rP   s   @r%   r<   r<   E   s     #";; +0	55 5 (	5  " #% #%J(8",/
     & & 	0 	0r(   r<   c                    t        |      S r    r   )rf   rV   s     r%   rI   rI     s    ur(   c                      e Zd Zy)r^   Nr,   r-   r.   r1   r(   r%   r^   r^   	      r(   r^   c                  &    e Zd ZddZedd       Zy)re   c                     d| j                   S )NzFailed to pickle )rf   r5   s    r%   __str__zPickleError.__str__  s    "488,//r(   c                     | j                   d   S )Nr   )r_   r5   s    r%   rf   zPickleError.key  s    yy|r(   N)r+   rT   )r+   r   )r,   r-   r.   r   r:   rf   r1   r(   r%   re   re     s    0  r(   re   c                      e Zd Zy)rb   Nr   r1   r(   r%   rb   rb     r   r(   rb   c                        e Zd Zd fdZ xZS )
AnyKeyFilec                4    t         |   t        |            S r    )rG   	_safe_keyrT   r{   s     r%   r   zAnyKeyFile._safe_key  s    w S**r(   )rf   r   r+   rT   )r,   r-   r.   r   r   r   s   @r%   r   r     s    + +r(   r   c                  ^     e Zd ZU ded<   ded<   ded<   dd fdZddZdd	Zd fd
Z xZS )rC   r   
max_weightzdict[Key, SpilledSize]weight_by_keyr   r   c           
     (   t        d      }t        t        t        gt        f   t        t        |d            }t        | !  |t        t        t        t        t        f   t        |                   || _        i | _        t        dd      | _        y )Nz+distributed.worker.memory.spill-compressionraise)compressionon_errorr   )r   r   r   r   rt   r   r   rG   rH   r   r	   r   r   r   r   r   r   )r#   rL   r   r   dumprP   s        r%   rH   zSlow.__init__%  s    .9
 fXu_%&K'R
 	U
+Z-HI	

 %'1-r(   c                
   t        j                  dd      5  | j                  |   }d d d        t        j                  ddd       t        j                  dt	              d       | j                  |      }|S # 1 sw Y   SxY w)Nz	disk-readsecondsrr   rs   rt   )r   meterdrv   lenload)r#   rf   pickledouts       r%   rw   zSlow.__getitem__<  sh      i8ffSkG 9##KG<##KWwGii 
 98s   A99Bc                   	 | j                  |      }|| j                  vsJ || j                  vsJ t        t        t        |            }| j                  dur1| j                  j                  |z   | j                  kD  rt        |      t        j                  dd      5  || j                  |<   d d d        t        j                  ddd       t        j                  d|d       t        t!        |      |      }|| j                  |<   | xj                  |z  c_	        y # t        $ r}t        |      |d }~ww xY w# 1 sw Y   xY w)NFz
disk-writer   rr   rs   rt   )r   	Exceptionre   r   r   summapr   r   r   r   r^   r   r   rv   r   r   )r#   rf   rV   r   rh   pickled_sizerB   s          r%   rl   zSlow.__setitem__D  s*   	*ii&G $&&   $,,,,,3vw/0 OO5(!!&&5G #3''   y9!DFF3K :##L!W=##L,H[/>"(3V#=  	* c")		*. :9s#   D! !D>!	D;*D66D;>Ec                    t         |   |       | xj                  | j                  j	                  |      z  c_        y r    )rG   rz   r   r   r   r{   s     r%   rz   zSlow.__delitem__g  s2    C T//33C88r(   r   )rL   rT   r   r   r   r   r   )	r,   r-   r.   r0   rH   rw   rl   rz   r   r   s   @r%   rC   rC      s/    $$))..!$F9 9r(   rC   )rf   r   rV   r   r+   r   )8
__future__r   loggingcollectionsr   collections.abcr   r   r   r   r	   r
   
contextlibr   	functoolsr   typingr   r   r   r   rD   dask.typingr   distributed.metricsr   distributed.protocolr   r    distributed.protocol.compressionr   distributed.sizeofr   distributed.utilsr   r   	getLoggerr,   r`   	addFilterr   r3   Bufferr   r<   rI   r   r^   	TypeErrorre   rb   Filer   Funcrt   rC   r1   r(   r%   <module>r      s    "  # X X %  6 6   - F E * 7			8	$   "#HI J   "#9: ;O* Ox <|0$++c6k* |0~
	y 	) 	9 	+ +I9499S&%'( I9r(   