
    CdL                    <   d Z ddlmZ ddlZddlmZmZmZ ddlm	Z	m
Z
mZ ddlmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZmZ dZ ed          Zddd5dZ d6dZ!d7dZ"	 d8d9dZ#ddd dd!d:d&Z$d;d/Z%ddd d0d<d1Z&ddd=d2Z' G d3 d4          Z(dS )>zTools to modify already existing dask graphs. Unlike in :mod:`dask.optimization`, the
output collections produced by this module are typically not functionally equivalent to
their inputs.
    )annotationsN)CallableHashableSet)AnyLiteralTypeVar)	clone_keyget_collection_namesget_name_from_keyreplace_name_in_keytokenizeunpack_collections)	blockwise)flatten)Delayeddelayed)HighLevelGraphLayerMaterializedLayer)bind
checkpointclonewait_onTsplit_everyr   float | Literal[False] | Nonereturnr   c                     d n( dur$t                       dk     rt          d          t          | \  }}t          |          dk    rt	          |d                    S  t          t          j                   fd|D              S )	a5  Build a :doc:`delayed` which waits until all chunks of the input collection(s)
    have been computed before returning None.

    Parameters
    ----------
    collections
        Zero or more Dask collections or nested data structures containing zero or more
        collections
    split_every: int >= 2 or False, optional
        Determines the depth of the recursive aggregation. If greater than the number of
        input keys, the aggregation will be performed in multiple steps; the depth of
        the aggregation graph will be :math:`log_{split_every}(input keys)`. Setting to
        a low value can reduce cache size and network transfers, at the cost of more CPU
        and a larger dask graph.

        Set to False to disable. Defaults to 8.

    Returns
    -------
    :doc:`delayed` yielding None
    N   F   z(split_every must be False, None, or >= 2   r   c              3  8   K   | ]}t          |          V  d S N)_checkpoint_one).0cr   s     7lib/python3.11/site-packages/dask/graph_manipulation.py	<genexpr>zcheckpoint.<locals>.<genexpr>B   s-      CC!oa--CCCCCC    )int
ValueErrorr   lenr&   r   chunksr   )r   collections_s   `  r)   r   r      s    2 	E	!	!+&&??GHHH'5NK
;1{1~{;;;)wv())CCCC{CCC
 	
r+   c                D   t          |           }d|z   }t          |                                           }	 t          |           t          |           nZ# t          $ rM |t
          j        |                                 fi}t          j        ||| f          }t          ||          cY S w xY wg }t                      }g }	t          |           D ]}
dt          |
|          z   }|                    |           t          t
          j        |
||           }|	t          |                                          z  }	|                    t          j        ||| f                     i }|r`t#          |	          |k    rM|t#          |          f}t
          j        |	d |         f||<   |	|d          |gz   }	|rt#          |	          |k    Mt
          j        |	f||<   |                    t          ||i||i                     t          j        | }t          ||          S )Nzcheckpoint-dependencieszcheckpoint_map-)r   r   __dask_keys__nextStopIterationr/   r   r   from_collectionsr   setr   add_build_map_layerlistget_output_keysappendr.   merge)
collectionr   tokname	keys_iterlayerdskdsks	map_namesmap_keys	prev_namemap_name	map_layerreduce_layerks                  r)   r&   r&   F   sJ   
:

C3D
002233I"YY " " "):+C+C+E+EFG-dEVVVtS!!!!!	" DIH)*55 	
 	
	$x	3'?'??h$V%6	8ZXX	D2244555+):-  	
 	
 	
 	
 L
 0#h--+553|$$%!,h||.DEQKLL)QC/  0#h--+55 !+X6LKKl34BSTTTUUU


%C4s   A AB-,B-boolc                   	 ddl m} t          | |          rdS n# t          $ r Y nw xY w	 ddlm} t          | |          rdS n# t          $ r Y nw xY w	 ddlm}m} t          | ||f          S # t          $ r Y dS w xY w)zReturn True if _map_blocks can be sped up via blockwise operations; False
    otherwise.

    FIXME this returns False for collections that wrap around around da.Array, such as
          pint.Quantity, xarray DataArray, Dataset, and Variable.
    r   )BagT)Array)	DataFrameSeriesF)	dask.bagrP   
isinstanceImportError
dask.arrayrQ   dask.dataframerR   rS   )r@   rP   rQ   rR   rS   s        r)   _can_apply_blockwiserY   t   s          j#&& 	4	   $$$$$$j%(( 	4	   44444444*y&&9:::   uus/    
((A 
AAA0 0
A>=A> funcr   rI   strnew_namer4   tuple[Delayed, ...]r   c                    t          |          ro	 |j        }n# t          $ r |j        f}Y nw xY wt	          d t          |          D                       }|rdd |D             ini }t           ||f|i|d|S t	          d |D                       t           fdt          |	                                          D                       S )a  Apply func to all keys of collection. Create a Blockwise layer whenever possible;
    fall back to MaterializedLayer otherwise.

    Parameters
    ----------
    func
        Callable to be invoked on the graph node
    prev_name : str
        name of the layer to map from; in case of dask base collections, this is the
        collection name. Note how third-party collections, e.g. xarray.Dataset, can
        have multiple names.
    new_name : str
        name of the layer to map to
    collection
        Arbitrary dask collection
    dependencies
        Zero or more Delayed objects, which will be passed as arbitrary variadic args to
        func after the collection's chunk
    c              3      K   | ]	\  }}|V  
d S r%   rZ   )r'   ir1   s      r)   r*   z#_build_map_layer.<locals>.<genexpr>   s&      ;;da;;;;;;r+   _depsc                    g | ]	}|j         
S rZ   keyr'   ds     r)   
<listcomp>z$_build_map_layer.<locals>.<listcomp>   s    888aAE888r+   )	numblocksr4   c              3  $   K   | ]}|j         V  d S r%   rd   rf   s     r)   r*   z#_build_map_layer.<locals>.<genexpr>   s$      551555555r+   c                f    i | ]-}t          |          k    t          |i          |fz   .S rZ   )r   r   )r'   rM   dep_keysr[   r]   rI   s     r)   
<dictcomp>z$_build_map_layer.<locals>.<dictcomp>   sP       $Q''944 $A	8'<==ay8?S444r+   )
rY   ri   AttributeErrornpartitionstuple	enumerater   r   r   r5   )	r[   rI   r]   r@   r4   ri   indiceskwargsrl   s	   ```     @r)   r;   r;      sS   4 J'' 
	2",II 	2 	2 	2#/1III	2;;i	&:&:;;;;;=IQ'88<88899r	
 !),%	
 	
 	
 	
 		
 5555555        !9!9!;!;<<  
 
 	
s    22T)omitseedassume_layersr   childrenru   Hashable | Nonerv   c               ^  	
 t          j                    j        |t          ||          nd	t	          |          \  }}|rd |D             t                      
nt                      d |D             
t	          |           \  }} |	
fd|D                       d         S )a
  
    Make ``children`` collection(s), optionally omitting sub-collections, dependent on
    ``parents`` collection(s). Two examples follow.

    The first example creates an array ``b2`` whose computation first computes an array
    ``a`` completely and then computes ``b`` completely, recomputing ``a`` in the
    process:

    >>> import dask
    >>> import dask.array as da
    >>> a = da.ones(4, chunks=2)
    >>> b = a + 1
    >>> b2 = bind(b, a)
    >>> len(b2.dask)
    9
    >>> b2.compute()
    array([2., 2., 2., 2.])

    The second example creates arrays ``b3`` and ``c3``, whose computation first
    computes an array ``a`` and then computes the additions, this time not
    recomputing ``a`` in the process:

    >>> c = a + 2
    >>> b3, c3 = bind((b, c), a, omit=a)
    >>> len(b3.dask), len(c3.dask)
    (7, 7)
    >>> dask.compute(b3, c3)
    (array([2., 2., 2., 2.]), array([3., 3., 3., 3.]))

    Parameters
    ----------
    children
        Dask collection or nested structure of Dask collections
    parents
        Dask collection or nested structure of Dask collections
    omit
        Dask collection or nested structure of Dask collections
    seed
        Hashable used to seed the key regeneration. Omit to default to a random number
        that will produce different keys at every call.
    assume_layers
        True
            Use a fast algorithm that works at layer level, which assumes that all
            collections in ``children`` and ``omit``

            #. use :class:`~dask.highlevelgraph.HighLevelGraph`,
            #. define the ``__dask_layers__()`` method, and
            #. never had their graphs squashed and rebuilt between the creation of the
               ``omit`` collections and the ``children`` collections; in other words if
               the keys of the ``omit`` collections can be found among the keys of the
               ``children`` collections, then the same must also hold true for the
               layers.
        False
            Use a slower algorithm that works at keys level, which makes none of the
            above assumptions.
    split_every
        See :func:`checkpoint`

    Returns
    -------
    Same as ``children``
        Dask collection or structure of dask collection equivalent to ``children``,
        which compute to the same values. All nodes of ``children`` will be regenerated,
        up to and excluding the nodes of ``omit``. Nodes immediately above ``omit``, or
        the leaf nodes if the collections in ``omit`` are not found, are prevented from
        computing until all collections in ``parents`` have been fully computed.
        The keys of the regenerated nodes will be different from the original ones, so
        that they can be used within the same graph.
    Nr   c                @    h | ]}|                                 D ]}|S rZ   )__dask_layers__)r'   collrD   s      r)   	<setcomp>zbind.<locals>.<setcomp>$  s1    RRR4;O;O;Q;QRR%uRRRRr+   c                @    h | ]}|                                 D ]}|S rZ   )__dask_graph__)r'   r|   re   s      r)   r}   zbind.<locals>.<setcomp>)  s1    KKKTT5H5H5J5JKKcSKKKKr+   c           	     6    g | ]}t          |          S rZ   )	_bind_one)r'   childblocker	omit_keysomit_layersru   s     r)   rh   zbind.<locals>.<listcomp>-  s9     	
 	
 	
 eWk9dCC	
 	
 	
r+   r   )uuiduuid4bytesr   r   r9   )rw   parentsrt   ru   rv   r   r1   unpacked_childrenrepackr   r   r   s      `     @@@r)   r   r      s    \ |z||! 9@8K
74444QU  !&&GD! LRRRRREE		eeKKTKKK	 28 < <v6	
 	
 	
 	
 	
 	
 	
*	
 	
 	
 
 	 	r+   r   r   Delayed | Noner   set[str]r   set[Hashable]r   c                   t          |           }|s| S |                                 }i }i }t          |t                    rG	 t	          |                                           }	nu# t          $ r |                                }	Y nUw xY wt          |          dk    rt          t          |                    }
n	t          | }
t          j        |
|          }|
h}	|                                |z
  }|D ]8}	 |j        |         }n# t          $ r Y w xY w||                                z  }9|g|j        }|                                }t          |t                    sJ |                    |j                   |                    |j                   nd }t	                      }|	r|	                                }t+          |          }||v r,|j        |         }|j        |         }||z
  }||z  }|	|z  }	||z  }|                    ||          \  ||<   }fd|D             |z  }|r|                    |           |||<   |	|rB|                                }||v r|j        |         }||z  }|||<   |j        |         ||<   |B|                                 \  }} |t          ||          g|R dfd|D             iS )Nr#   ru   )keysru   bind_toc                2    h | ]}t          |           S )r   r
   )r'   depru   s     r)   r}   z_bind_one.<locals>.<setcomp>x  s3     
 
 
*-Ic%%%
 
 
r+   renamec                2    i | ]}|t          |          S rZ   r   )r'   rI   ru   s     r)   rm   z_bind_one.<locals>.<dictcomp>  s%    WWW)	9Y55WWWr+   )r   r   rU   r   r9   r{   rn   copyr.   r6   iterr   r8   get_all_external_keyslayersKeyErrorr=   re   updater4   popr
   r   r:   __dask_postpersist__)r   r   r   r   ru   prev_coll_namesrE   
new_layersnew_depslayers_to_clonehlg_name
clone_keys
layer_namerD   blocker_keyblocker_dsklayers_to_copy_verbatimprev_layer_namenew_layer_name
layer_depslayer_deps_to_clonelayer_deps_to_omitis_boundnew_deprebuildargss       `                     r)   r   r   4  s    +511O  



 
 C#%J$&H#~&& %	5!%"7"7"9"9::OO 	5 	5 	5-2244OOO	5 1$$D1122HH1H-h<<#***,,y8J! . .
	Jz*EE 	 	 	H	e++---


 k,,..+~66666+,---01111!ee
 +)--//"?>>>Z''
?+%o6
(;6'+5..#55/4{{$ 0; 0
 0
,
>"H
 
 
 
1D
 
 
  	%KK$$$#* +  +6 " 8,0022
##%j1
:-)!$J!7
: " 8 ..00MGT7z8,,	   XWWWWWW  s$   !A% %BB6D
DD)rt   ru   rv   c                d    t          |d| ||          }t          |          dk    r|d         n|S )a	  Clone dask collections, returning equivalent collections that are generated from
    independent calculations.

    Examples
    --------
    (tokens have been simplified for the sake of brevity)

    >>> import dask.array as da
    >>> x_i = da.asarray([1, 1, 1, 1], chunks=2)
    >>> y_i = x_i + 1
    >>> z_i = y_i + 2
    >>> dict(z_i.dask)  # doctest: +SKIP
    {('array-1', 0): array([1, 1]),
     ('array-1', 1): array([1, 1]),
     ('add-2', 0): (<function operator.add>, ('array-1', 0), 1),
     ('add-2', 1): (<function operator.add>, ('array-1', 1), 1),
     ('add-3', 0): (<function operator.add>, ('add-2', 0), 1),
     ('add-3', 1): (<function operator.add>, ('add-2', 1), 1)}
    >>> w_i = clone(z_i, omit=x_i)
    >>> w_i.compute()
    array([4, 4, 4, 4])
    >>> dict(w_i.dask)  # doctest: +SKIP
    {('array-1', 0): array([1, 1]),
     ('array-1', 1): array([1, 1]),
     ('add-4', 0): (<function operator.add>, ('array-1', 0), 1),
     ('add-4', 1): (<function operator.add>, ('array-1', 1), 1),
     ('add-5', 0): (<function operator.add>, ('add-4', 0), 1),
     ('add-5', 1): (<function operator.add>, ('add-4', 1), 1)}

    The typical usage pattern for clone() is the following:

    >>> x = cheap_computation_with_large_output()  # doctest: +SKIP
    >>> y = expensive_and_long_computation(x)  # doctest: +SKIP
    >>> z = wrap_up(clone(x), y)  # doctest: +SKIP

    In the above code, the chunks of x will be forgotten as soon as they are consumed by
    the chunks of y, and then they'll be regenerated from scratch at the very end of the
    computation. Without clone(), x would only be computed once and then kept in memory
    throughout the whole computation of y, needlessly consuming memory.

    Parameters
    ----------
    collections
        Zero or more Dask collections or nested structures of Dask collections
    omit
        Dask collection or nested structure of Dask collections which will not be cloned
    seed
        See :func:`bind`
    assume_layers
        See :func:`bind`

    Returns
    -------
    Same as ``collections``
        Dask collections of the same type as the inputs, which compute to the same
        value, or nested structures equivalent to the inputs, where the original
        collections have been replaced.
        The keys of the regenerated nodes in the new collections will be different from
        the original ones, so that they can be used within the same graph.
    N)r   rt   ru   rv   r#   r   )r   r.   )rt   ru   rv   r0   outs        r)   r   r     sF    z T4}  C %%**3q663r+   c                    t          |d| ifdt          | \  }} |fd|D                       }t          |          dk    r|d         n|S )a7  Ensure that all chunks of all input collections have been computed before
    computing the dependents of any of the chunks.

    The following example creates a dask array ``u`` that, when used in a computation,
    will only proceed when all chunks of the array ``x`` have been computed, but
    otherwise matches ``x``:

    >>> import dask.array as da
    >>> x = da.ones(10, chunks=5)
    >>> u = wait_on(x)

    The following example will create two arrays ``u`` and ``v`` that, when used in a
    computation, will only proceed when all chunks of the arrays ``x`` and ``y`` have
    been computed but otherwise match ``x`` and ``y``:

    >>> x = da.ones(10, chunks=5)
    >>> y = da.zeros(10, chunks=5)
    >>> u, v = wait_on(x, y)

    Parameters
    ----------
    collections
        Zero or more Dask collections or nested structures of Dask collections
    split_every
        See :func:`checkpoint`

    Returns
    -------
    Same as ``collections``
        Dask collection of the same type as the input, which computes to the same value,
        or a nested structure equivalent to the input where the original collections
        have been replaced.
        The keys of the regenerated nodes of the new collections will be different from
        the original ones, so that they can be used within the same graph.
    r   c           	     x   t          | 
          }g }i }t          |           D ]e}dt          ||          z   }|||<   t          t          j        ||| 
f          }|                    t          j        ||| 
f                     ft          j        | }| 	                                \  }}	 ||g|	R d|iS )Nzwait_on-r3   r   )
r   r   r;   r/   r   r>   r   r8   r?   r   )r|   rA   rF   r   rI   r]   rD   rE   r   r   r   s             r)   	block_onezwait_on.<locals>.block_one   s    tW%%-d33 
	 
	I!HY$<$<<H (F9$Y$gZ  E KK/e4/     
 "D)1133ws1T111&111r+   c                &    g | ]} |          S rZ   rZ   )r'   r|   r   s     r)   rh   zwait_on.<locals>.<listcomp>  s!    777d))D//777r+   r#   r   )r   r   r.   )r   r0   unpackedr   r   r   r   s        @@r)   r   r     s    N +?;??G2 2 2 2 2& *;7Hf
&7777h777
8
8C%%**3q663r+   c                  B    e Zd ZdZed	d            Zed
d            ZdS )r/   z*Callables to be inserted in the Dask graphnoder   r   c                    | S )zDummy graph node of :func:`bind` and :func:`wait_on`.
        Wait for both node and all variadic args to complete; then return node.
        rZ   )r   r   rs   s      r)   r   zchunks.bind  s	    
 r+   Nonec                     dS )zrDummy graph node of :func:`checkpoint`.
        Wait for all variadic args to complete; then return None.
        NrZ   )r   rs   s     r)   r   zchunks.checkpoint"  s	    
 	r+   N)r   r   r   r   )r   r   )__name__
__module____qualname____doc__staticmethodr   r   rZ   r+   r)   r/   r/     sX        44   \    \  r+   r/   )r   r   r   r   )r   r   )r   rN   )rZ   )
r[   r   rI   r\   r]   r\   r4   r^   r   r   )
rw   r   ru   rx   rv   rN   r   r   r   r   )r   r   r   r   r   r   r   r   ru   r   r   r   )ru   r   rv   rN   )r   r   ))r   
__future__r   r   collections.abcr   r   r   typingr   r   r	   	dask.baser
   r   r   r   r   r   dask.blockwiser   	dask.corer   dask.delayedr   r   dask.highlevelgraphr   r   r   __all__r   r   r&   rY   r;   r   r   r   r   r/   rZ   r+   r)   <module>r      sd    # " " " " "  3 3 3 3 3 3 3 3 3 3 ( ( ( ( ( ( ( ( ( (                % $ $ $ $ $       ) ) ) ) ) ) ) ) H H H H H H H H H H
4GCLL
 26&
 &
 &
 &
 &
 &
R+ + + +\   D )+7
 7
 7
 7
 7
| 
 15f	 f	 f	 f	 f	 f	R] ] ] ]@ "DPT @4 @4 @4 @4 @4 @4J 26>4 >4 >4 >4 >4 >4B         r+   