
    [e                        d dl mZ d dlZd dlZd dlmZmZ d dlmZm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlmZ d d	lmZmZmZmZ  G d
 deeef                   ZdS )    )annotationsN)Callable
Collection)ExecutorThreadPoolExecutor)wraps)chain)AnyLiteral)Buffer)KTVTTlockedc                       e Zd ZU dZded<   ded<   ded<   ded	<    eej                  d
ddd( fd            Zd) fdZd*dZ	e
	 d+d,d            Zd-d$Ze
d.d/d'            Z xZS )0AsyncBuffera  Extension of :class:`~zict.Buffer` that allows offloading all reads and writes
    from/to slow to a separate worker thread.

    This requires ``fast`` to be fully thread-safe (e.g. a plain dict).

    ``slow.__setitem__`` and ``slow.__getitem__`` will be called from the offloaded
    thread, while all of its other methods (including, notably for the purpose of
    thread-safety consideration, ``__contains__`` and ``__delitem__``) will be called
    from the main thread.

    See Also
    --------
    Buffer

    Parameters
    ----------
    Same as in Buffer, plus:

    executor: concurrent.futures.Executor, optional
        An Executor instance to use for offloading. It must not pickle/unpickle.
        Defaults to an internal ThreadPoolExecutor.
    nthreads: int, optional
        Number of offloaded threads to run in parallel. Defaults to 1.
        Mutually exclusive with executor parameter.
    Executor | Noneexecutorz
int | Nonenthreadszset[asyncio.Future]futureszdict[asyncio.Future, float]evictingN   )r   r   argsr
   intkwargsreturnNonec                    t                      j        |i | || _        |rd n|| _        |d u | _        t                      | _        i | _        d S N)super__init__r   r   _internal_executorsetr   r   )selfr   r   r   r   	__class__s        1lib/python3.11/site-packages/zict/async_buffer.pyr!   zAsyncBuffer.__init__/   s[     	$)&)))  (6h"*d"2uu    c                    t                                                       | j        D ]}|                                 | j        +| j        &| j                            d           d | _        d S d S d S )NT)wait)r    closer   cancelr   r   shutdown)r$   futurer%   s     r&   r*   zAsyncBuffer.close>   su    l 	 	FMMOOOO=$)BM"""--- DMMM %$)B)Br'   funcCallable[..., T]asyncio.Future[T]c                N   | j         $| j        sJ t          | j        d          | _         t          j                    }t          j                    } |j        | j         |j        |g|R  }| j	        
                    |           |                    | j	        j                   |S )Nzzict.AsyncBuffer offloader)thread_name_prefix)r   r   r   asyncioget_running_loopcontextvarscopy_contextrun_in_executorrunr   addadd_done_callbackremove)r$   r.   r   loopcontextr-   s         r&   _offloadzAsyncBuffer._offloadG   s    = =   .2N  DM '))*,,%%dmW[$NNNN     !4555r'   raisekeysCollection[KT]missingLiteral['raise', 'omit']asyncio.Future[dict[KT, VT]]c                x    dk    r fdD             n1dk    rD ]}| vrt          |          nt          d           	  j                                      }t	          j                    }|                    |           |S # t           $ r Y nw xY wd fd}                     |          S )	aj  Fetch one or more key/value pairs. If not all keys are available in fast,
        offload to a worker thread moving keys from slow to fast, as well as possibly
        moving older keys from fast to slow.

        Parameters
        ----------
        keys:
            collection of zero or more keys to get
        missing: raise or omit, optional
            raise (default)
                If any key is missing, raise KeyError.
            omit
                If a key is missing, return a dict with less keys than those requested.

        Notes
        -----
        All keys may be present when you call ``async_get``, but ``__delitem__`` may be
        called on one of them before the actual data is fetched. ``__setitem__`` also
        internally calls ``__delitem__`` in a non-atomic way, so you may get
        ``KeyError`` when updating a value too.
        omitc                    g | ]}|v |	S  rH   ).0keyr$   s     r&   
<listcomp>z)AsyncBuffer.async_get.<locals>.<listcomp>t   s    777C3$;;C;;;r'   r?   z%missing: expected raise or omit; got r   dict[KT, VT]c                     i } D ]D}j         j        rt          j                    	 |         | |<   .# t          $ r
 dk    r Y Aw xY w| S )Nr?   )fastclosedr3   CancelledErrorKeyError)dkr@   rB   r$   s     r&   
_async_getz)AsyncBuffer.async_get.<locals>._async_get   s    A 
 
9# 3!02227AaDD    ')) *)
 Hs   4AA)r   rL   )rQ   
ValueErrorrN   get_all_or_nothingr3   Future
set_resultr>   )r$   r@   rB   rJ   rR   frT   s   ```    r&   	async_getzAsyncBuffer.async_getX   s   6 f77774777DD ( (d??"3--' #( NWNNOOO		 	,,T22A /6n.>.>ALLOOOH  	 	 	D		 	 	 	 	 	 	 	 }}Z(((s   B 
BBrJ   r   valuer   c                Z    |                      ||           |                                  dS )zImmediately set a key in fast. If this causes the total weight to exceed n,
        asynchronously start moving keys from fast to slow in a worker thread.
        N)set_noevictasync_evict_until_below_target)r$   rJ   r[   s      r&   __setitem__zAsyncBuffer.__setitem__   s2     	e$$$++-----r'   nfloat | Nonec                N   || j         }t          d|          }t          t          | j        j        g| j                                                            }||k    rdS |                     | j	        |          }|| j        |<   |
                    | j        j                   dS )zvIf the total weight exceeds n, asynchronously start moving keys from fast to
        slow in a worker thread.
        Ng        )r`   maxminr	   rN   total_weightr   valuesr>   evict_until_below_targetr:   __delitem__)r$   r`   weightr-   s       r&   r^   z*AsyncBuffer.async_evict_until_below_target   s    
 9AQKKUDI23T]5I5I5K5KLLMMQ;;F t<a@@ !f  !:;;;;;r'   )
r   r
   r   r   r   r   r   r
   r   r   )r   r   )r.   r/   r   r
   r   r0   )r?   )r@   rA   rB   rC   r   rD   )rJ   r   r[   r   r   r   r   )r`   ra   r   r   )__name__
__module____qualname____doc____annotations__r   r   r!   r*   r>   r   rZ   r_   r^   __classcell__)r%   s   @r&   r   r      s8         4     ))))
U6? %)	       ! ! ! ! ! !   " HO>) >) >) >) V>)@. . . . < < < < V< < < < <r'   r   )
__future__r   r3   r5   collections.abcr   r   concurrent.futuresr   r   	functoolsr   	itertoolsr	   typingr
   r   zict.bufferr   zict.commonr   r   r   r   r   rH   r'   r&   <module>rx      s   " " " " " "      0 0 0 0 0 0 0 0 ; ; ; ; ; ; ; ;                           ) ) ) ) ) ) ) ) ) ) ) )a< a< a< a< a<&R. a< a< a< a< a<r'   