
    Cd                       d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dlmZ d dlmZ d d	lmZmZmZmZ d d
lmZ d dlmZ d dlmZmZmZm Z   G d d          Z! G d de          Z" G d de"          Z# G d de"          Z$ G d de          Z%d$dZ&d Z'd Z( G d de          Z) G d de)          Z* G d de          Z+ G d  d!e          Z, G d" d#e          Z-dS )%    annotationsN)defaultdict)Callable)product)Any)map)tokenize)	BlockwiseBlockwiseDepBlockwiseDepDictblockwise_token)flatten)Layer)applycached_cumsumconcreteinsertc                      e Zd ZdZd Zd ZdS )CallableLazyImportzFunction Wrapper for Lazy Importing.

    This Class should only be used when materializing a graph
    on a distributed scheduler.
    c                    || _         d S N)function_path)selfr   s     +lib/python3.11/site-packages/dask/layers.py__init__zCallableLazyImport.__init__"   s    *    c                :    ddl m}   || j                  |i |S )Nr   )import_term)distributed.utilsr   r   )r   argskwargsr   s       r   __call__zCallableLazyImport.__call__%   s6    111111.{{4-..????r   N)__name__
__module____qualname____doc__r   r#    r   r   r   r      sA         + + +@ @ @ @ @r   r   c                  F    e Zd ZU dZded<   ded<   dZded<   dd	ZddZdS )ArrayBlockwiseDepzg
    Blockwise dep for array-likes, which only needs chunking
    information to compute its data.
    tuple[tuple[int, ...], ...]chunkstuple[int, ...]	numblocksFboolproduces_tasksc                ^    || _         t          d |D                       | _        d| _        d S )Nc              3  4   K   | ]}t          |          V  d S r   )len).0chunks     r   	<genexpr>z-ArrayBlockwiseDep.__init__.<locals>.<genexpr>>   s(      >>es5zz>>>>>>r   F)r,   tupler.   r0   )r   r,   s     r   r   zArrayBlockwiseDep.__init__<   s5    >>v>>>>>#r   idxc                     t          d          )Nz%Subclasses must implement __getitem__)NotImplementedErrorr   r8   s     r   __getitem__zArrayBlockwiseDep.__getitem__A   s    !"IJJJr   Nr,   r+   r8   r-   )r$   r%   r&   r'   __annotations__r0   r   r<   r(   r   r   r*   r*   2   sy          
 (''' N    $ $ $ $
K K K K K Kr   r*   c                      e Zd ZdZddZdS )ArrayChunkShapeDepz(Produce chunk shapes given a chunk indexr8   r-   c                Z    t          d t          || j                  D                       S )Nc              3  ,   K   | ]\  }}||         V  d S r   r(   )r4   ir5   s      r   r6   z1ArrayChunkShapeDep.__getitem__.<locals>.<genexpr>I   s*      DD(!UU1XDDDDDDr   )r7   zipr,   r;   s     r   r<   zArrayChunkShapeDep.__getitem__H   s+    DDc#t{.C.CDDDDDDr   Nr>   )r$   r%   r&   r'   r<   r(   r   r   rA   rA   E   s4        22E E E E E Er   rA   c                  8     e Zd ZU dZded<   d	 fdZd
dZ xZS )ArraySliceDepz>Produce slice(s) into the full-sized array given a chunk indexr+   startsr,   c                    t                                          |           t          d |D                       | _        d S )Nc              3  8   K   | ]}t          |d           V  dS )T)initial_zeroN)r   )r4   cs     r   r6   z)ArraySliceDep.__init__.<locals>.<genexpr>S   s/      PPAM!$???PPPPPPr   )superr   r7   rH   )r   r,   	__class__s     r   r   zArraySliceDep.__init__Q   s=       PPPPPPPr   r8   r7   c                    t          d t          || j                  D                       }t          d |D                       S )Nc              3  B   K   | ]\  }}||         ||d z            fV  dS    Nr(   )r4   rD   starts      r   r6   z,ArraySliceDep.__getitem__.<locals>.<genexpr>V   s7      SSEU1XuQU|,SSSSSSr   c              3  2   K   | ]}t          g |d R  V  d S r   slice)r4   ss     r   r6   z,ArraySliceDep.__getitem__.<locals>.<genexpr>W   s0      22U_A_t___222222r   )r7   rE   rH   )r   r8   locs      r   r<   zArraySliceDep.__getitem__U   sG    SSSdk=R=RSSSSS22c222222r   r=   )r8   r7   )r$   r%   r&   r'   r?   r   r<   __classcell__rN   s   @r   rG   rG   L   sk         HH''''Q Q Q Q Q Q3 3 3 3 3 3 3 3r   rG   c                  j     e Zd ZdZ fdZd Zed             Zd Zd Z	d Z
d Zd	 Zd
 ZddZ xZS )ArrayOverlapLayera\  Simple HighLevelGraph array overlap layer.

    Lazily computed High-level graph layer for a array overlap operations.

    Parameters
    ----------
    name : str
        Name of new output overlap array.
    array : Dask array
    axes: Mapping
        Axes dictionary indicating overlap in each dimension,
        e.g. ``{'0': 1, '1': 1}``
    c                    t                                                       || _        || _        || _        || _        || _        d | _        d S r   )rM   r   nameaxesr,   r.   token_cached_keys)r   r^   r_   r,   r.   r`   rN   s         r   r   zArrayOverlapLayer.__init__i   sL     			"
 r   c                    d| j          dS )NzArrayOverlapLayer<name=''r^   r   s    r   __repr__zArrayOverlapLayer.__repr__y   s    6$)6666r   c                t    t          | d          r| j        S |                                 }|| _        | j        S z$Materialize full dict representation_cached_dicthasattrri   _construct_graphr   dsks     r   _dictzArrayOverlapLayer._dict|   A     4(( 	$$$''))C #D  r   c                    | j         |         S r   ro   r   keys     r   r<   zArrayOverlapLayer.__getitem__       z#r   c                *    t          | j                  S r   iterro   re   s    r   __iter__zArrayOverlapLayer.__iter__       DJr   c                *    t          | j                  S r   r3   ro   re   s    r   __len__zArrayOverlapLayer.__len__       4:r   c                "    t          | d          S Nri   rk   re   s    r   is_materializedz!ArrayOverlapLayer.is_materialized       t^,,,r   c                *    |                                  S r   )keysre   s    r   get_output_keysz!ArrayOverlapLayer.get_output_keys   s    yy{{r   c                    | j         | j         S | j        | j        | j        cfd             x| _         }|S )Nc                      sfgS t                     }|dz   t                    k    r# fdt          |                   D             }n" fdt          |                   D             }|S )NrR   c                $    g | ]}fz   |fz   S r(   r(   )r4   rD   r!   r^   s     r   
<listcomp>z>ArrayOverlapLayer._dask_keys.<locals>.keys.<locals>.<listcomp>   s&    OOOA4'D.A4/OOOr   c                "    g | ]} |fz    S r(   r(   )r4   rD   r!   r   s     r   r   z>ArrayOverlapLayer._dask_keys.<locals>.keys.<locals>.<listcomp>   s&    NNN1$$.NNNr   )r3   range)r!   indresultr,   r   r^   r.   s   `  r   r   z*ArrayOverlapLayer._dask_keys.<locals>.keys   s     !y d))CQw#i..((OOOOOy~9N9NOOONNNNNin8M8MNNNMr   )ra   r^   r,   r.   )r   r   r,   r   r^   r.   s     @@@@r   
_dask_keyszArrayOverlapLayer._dask_keys   sp    ($$"&)T[$.fi	 	 	 	 	 	 	 	 &*TVV+Fr   Fc                   | j         }| j        }| j        }|                                 }d| j        z   }d| j        z   }|rt          d          }nddlm} t          t          t          |                    }	t          j        t          |	|          }
t          j        |t           t          |
          t          t                     t          j        t                    }i }i }|D ]W}t%          |f|z   |          }|f|z   |k    r
|||f|z   <   *|f|z   ||f|z   <   |t&           |
d|z   |          ff||f|z   <   Xt          j        ||          }|S )	z/Construct graph for a simple overlap operation.zgetitem-zoverlap-zdask.array.core.concatenate3r   )concatenate3)dimsr_   r   rd   )r_   r,   r^   r   r`   r   dask.array.corer   listr	   r3   	functoolspartial_expand_keys_around_centertoolzpiper   concatfractional_slicer   merge)r   deserializingr_   r,   r^   	dask_keysgetitem_nameoverlap_namer   r   expand_key2interior_keysinterior_slicesoverlap_blocksk
frac_slicern   s                    r   rl   z"ArrayOverlapLayer._construct_graph   s   yyOO%%	!DJ.!DJ. 	5 ..LMMLL 544444CV$$%%'&T
 
 

 
wK 0 0#g,,d
 
  		 		A)4'A+t<<Jw{j((7A! 3448<w{! 34 {{7Q;\JJJK7233
 k/>::
r   F)r$   r%   r&   r'   r   rf   propertyro   r<   ry   r}   r   r   r   rl   rY   rZ   s   @r   r\   r\   Z   s         ! ! ! ! ! 7 7 7 ! ! X!         - - -    &) ) ) ) ) ) ) )r   r\   c                   fdg }t          | dd                   D ];\  }}d}|dk    r|dz  }||         dz
  k     r|dz  }|                    |           <fdt          | dd                   D             }||gg|z   }t          t          |           }	fdt          |          D             }
t	          |
|	          }|S )a  Get all neighboring keys around center

    Parameters
    ----------
    k: tuple
        They key around which to generate new keys
    dims: Sequence[int]
        The number of chunks in each dimension
    name: Option[str]
        The name to include in the output keys, or none to include no name
    axes: Dict[int, int]
        The axes active in the expansion.  We don't expand on non-active axes

    Examples
    --------
    >>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
     [('y',   2, 2.1), ('y',   2, 3), ('y',   2, 3.9)],
     [('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]

    >>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y',   0, 3.1), ('y',   0,   4)],
     [('y', 0.9, 3.1), ('y', 0.9,   4)]]
    c                    g }|dz
  dk    r|                     |dz
             |                     |           |dz   |          dz
  k     r|                     |dz              |S )Ng?r   rR   )append)rD   r   rvr   s      r   indsz(_expand_keys_around_center.<locals>.inds   sl    9q==IIcCi   
		#9tAw{""IIcCi   	r   rR   Nr   c                |    g | ]8\  }}t                              |d           f          r ||          n|g9S )r   anyget)r4   rD   r   r_   r   s      r   r   z._expand_keys_around_center.<locals>.<listcomp>   sV       >DaTXXa^^-..9QSE  r   c                f    g | ]-\  }}t                              |d           f          r|nd.S )r   rR   r   )r4   rD   dr_   s      r   r   z._expand_keys_around_center.<locals>.<listcomp>  s=    NNNTQ3A())0aaqNNNr   )	enumerater   r   r   reshapelist)r   r   r^   r_   shaperD   r   numr!   seqshape2r   r   s    ` `        @r   r   r      s-   4     EAabbE""  3771HCa11HCS    HQRSTUTVTVRWHXHX  D x$
w~

CNNNNYu=M=MNNNF%%FMr   c                     t                     dk    rt          |          S t          t          |           d         z            } fdt          j        ||          D             S )zgReshape iterator to nested shape

    >>> reshapelist((2, 3), range(6))
    [[0, 1, 2], [3, 4, 5]]
    rR   r   c                @    g | ]}t          d d         |          S rQ   )r   )r4   partr   s     r   r   zreshapelist.<locals>.<listcomp>  s+    QQQE!""It,,QQQr   )r3   r   intr   	partition)r   r   ns   `  r   r   r     sb     5zzQCyyC58#$$QQQQC9P9PQQQQr   c                   | d         ft          d | dd         D                       z   }g }t          t          | dd         |dd                             D ]\  }\  }}|                    |d          }t	          |t                     r|d         }|d         }	n|}|}	||k    r%|                    t          ddd                     s||k     r&|	r$|                    t          d|	                     ||k    r'|r%|                    t          | d                     |                    t          dd                     t          |          }t          d |D                       r| S t          j	        ||fS )a  

    >>> fractional_slice(('x', 5.1), {0: 2})
    (<built-in function getitem>, ('x', 5), (slice(-2, None, None),))

    >>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))

    >>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
    r   c              3  N   K   | ] }t          t          |                    V  !d S r   )r   round)r4   rD   s     r   r6   z#fractional_slice.<locals>.<genexpr>$  s.       A A1U1XX A A A A A Ar   rR   Nc              3  @   K   | ]}|t          d d d           k    V  d S r   rU   )r4   r   s     r   r6   z#fractional_slice.<locals>.<genexpr>:  s3      
;
;c3%dD)))
;
;
;
;
;
;r   )
r7   r   rE   r   
isinstancer   rV   alloperatorgetitem)
taskr_   roundedindexrD   trdepth
left_depthright_depths
             r   r   r     s    Awj5 A AQRR A A AAAAGEs48WQRR[99:: & &	6AqAeU## 	 qJ(KKJK66LLtT4001111UU{ULLq+..////UUzULL
{D112222LLq!%%%%%LLE

;
;U
;
;
;;; 2 '511r   c                       e Zd ZdZ	 	 d fd	Zd Zd Zd Zd Ze	d             Z
d	 Zd
 Zd Zd ZddZd Zd ZddZ xZS )SimpleShuffleLayera  Simple HighLevelGraph Shuffle layer

    High-level graph layer for a simple shuffle operation in which
    each output partition depends on all input partitions.

    Parameters
    ----------
    name : str
        Name of new shuffled output collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    npartitions : int
        Number of output partitions.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc
                *   || _         || _        || _        || _        || _        || _        || _        |pt          |          | _        d| j         z   | _	        |	pi }	d | _
        d|	vr
| j        |	d<   t                                          |	           d S )Nsplit-priorityr   )r^   columnnpartitionsnpartitions_inputignore_index
name_input
meta_inputr   	parts_out
split_name_split_keys_key_priorityrM   r   )r   r^   r   r   r   r   r   r   r   r   rN   s             r   r   zSimpleShuffleLayer.__init__d  s     	&!2($$"8eK&8&8"TY. "'R[((&*&8K
#[11111r   c                Z    t          |t                    sJ |d         | j        k    rdS dS )Nr   rR   )r   r7   r   rs   s     r   r   z SimpleShuffleLayer._key_priority  s3    #u%%%%%q6T_$$11r   c                *      fd j         D             S )Nc                "    h | ]}j         |fS r(   rd   r4   r   r   s     r   	<setcomp>z5SimpleShuffleLayer.get_output_keys.<locals>.<setcomp>       ===dD!===r   r   re   s   `r   r   z"SimpleShuffleLayer.get_output_keys      ====dn====r   c                B    d                     | j        | j                  S )Nz-SimpleShuffleLayer<name='{}', npartitions={}>)formatr^   r   re   s    r   rf   zSimpleShuffleLayer.__repr__  s$    >EEIt'
 
 	
r   c                "    t          | d          S r   r   re   s    r   r   z"SimpleShuffleLayer.is_materialized  r   r   c                t    t          | d          r| j        S |                                 }|| _        | j        S rh   rj   rm   s     r   ro   zSimpleShuffleLayer._dict  rp   r   c                    | j         |         S r   rr   rs   s     r   r<   zSimpleShuffleLayer.__getitem__  ru   r   c                *    t          | j                  S r   rw   re   s    r   ry   zSimpleShuffleLayer.__iter__  rz   r   c                *    t          | j                  S r   r|   re   s    r   r}   zSimpleShuffleLayer.__len__  r~   r   c                    t                      }|D ]:}	 |\  }}n# t          $ r Y w xY w|| j        k    r%|                    |           ;|S z4Simple utility to convert keys to partition indices.set
ValueErrorr^   addr   r   partsrt   _name_parts         r   _keys_to_partsz!SimpleShuffleLayer._keys_to_parts  t     	 	C"uu   	!!IIe   
''c                     t          t                    }|p                     |          }|D ]7}| j        |fxx          fdt	           j                  D             z  cc<   8|S )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        c                "    h | ]}j         |fS r(   r   r4   rD   r   s     r   r   z8SimpleShuffleLayer._cull_dependencies.<locals>.<setcomp>  s-     ( ( ()*!$( ( (r   )r   r   r   r^   r   r   )r   r   r   depsr   s   `    r   _cull_dependenciesz%SimpleShuffleLayer._cull_dependencies  s     3:!4!4T!:!:	 	 	D$)T"### ( ( ( (.3D4J.K.K( ( ( #### r   c           
     v    t          | j        | j        | j        | j        | j        | j        | j        |          S Nr   )r   r^   r   r   r   r   r   r   r   r   s     r   _cullzSimpleShuffleLayer._cull  sB    !IK"OO	
 	
 	
 		
r   c                    |                      |          }|                     ||          }|t          | j                  k    r|                     |          }||fS | |fS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   r   r   r   r   r  r   r   all_keysr   culled_depsculled_layers         r   cullzSimpleShuffleLayer.cull  k     ''--	--di-HHDN++++::i00L,,$$r   Fc           
        
 d j         z   }|rt          d          }t          d          }nddlm} ddlm} i } j        D ]

 fdt           j                  D             }|| j	        f| j         
f<   |D ]W\  }}}	t          j        ||	f|f| j        ||	f<   ||	f|vr/| j        |	f j        d j         j         j	         j        f|||	f<   X|S )z/Construct graph for a simple shuffle operation.group-dask.dataframe.core._concat$dask.dataframe.shuffle.shuffle_groupr   _concatshuffle_groupc                $    g | ]}j         |fS r(   )r   )r4   part_inpart_outr   s     r   r   z7SimpleShuffleLayer._construct_graph.<locals>.<listcomp>  s2        (G4  r   )r^   r   dask.dataframe.corer  dask.dataframe.shuffler  r   r   r   r   r   r   r   r   r   r   )r   r   shuffle_group_nameconcat_funcshuffle_group_funcrn   _concat_list_	_part_out_part_inr  s   `         @r   rl   z#SimpleShuffleLayer._construct_graph  st    &	1 
	S --JKKK!36" "
 CBBBBBRRRRRR 	 	H    $T%;<<  L
 !*CH%&
 +7  &9h$'2?T_i:;
 '1<<*(3(()(	;C+X67$ 
r   NNr   r   )r$   r%   r&   r'   r   r   r   rf   r   r   ro   r<   ry   r}   r   r   r  r  rl   rY   rZ   s   @r   r   r   G  s*        J (2 (2 (2 (2 (2 (2T  > > >
 
 

- - - ! ! X!              

 

 

% % % . . . . . . . .r   r   c                  D     e Zd ZdZ	 	 d	 fd	Zd Zd
dZd ZddZ xZ	S )ShuffleLayera"  Shuffle-stage HighLevelGraph layer

    High-level graph layer corresponding to a single stage of
    a multi-stage inter-partition shuffle operation.

    Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)

    Parameters
    ----------
    name : str
        Name of new (partially) shuffled collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    inputs : list of tuples
        Each tuple dictates the data movement for a specific partition.
    stage : int
        Index of the current shuffle stage.
    npartitions : int
        Number of output partitions for the full (multi-stage) shuffle.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    k : int
        A partition is split into this many groups during each stage.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc                    || _         || _        || _        t                                          ||||||	|
|pt          t          |                    |	  	         d S )N)r   r   )inputsstagensplitsrM   r   r   r3   )r   r^   r   r%  r&  r   r   r'  r   r   r   r   r   rN   s                r   r   zShuffleLayer.__init__A  ss     
55V#5#5# 	 
	
 
	
 
	
 
	
 
	
r   c                Z    d                     | j        | j        | j        | j                  S )Nz=ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>)r   r^   r&  r'  r   re   s    r   rf   zShuffleLayer.__repr___  s,    NUUItz4<1A
 
 	
r   c                    t          t                    }|p|                     |          }d t          | j                  D             }|D ]}| j        |         }t          | j                  D ]}t          || j        |          }||         }	| j        dk    r9|	| j	        k    r.|| j
        |f                             d| j
        z   |df           d|| j
        |f                             | j        |	f           |S )zqDetermine the necessary dependencies to produce `keys`.

        Does not require graph materialization.
        c                    i | ]\  }}||	S r(   r(   r4   rD   inps      r   
<dictcomp>z3ShuffleLayer._cull_dependencies.<locals>.<dictcomp>k      DDD61cQDDDr   r   r  empty)r   r   r   r   r%  r   r'  r   r&  r   r^   r   r   )
r   r   r   r   inp_part_mapr   outr   _inpr   s
             r   r   zShuffleLayer._cull_dependenciesd  s   
 3:!4!4T!:!:	DDYt{-C-CDDD 	J 	JD+d#C4<(( J Jc4:q11$T*:??u0F'F'F$)T*+//DI1EtW0UVVVV$)T*+//%0HIIIIJ r   c                    t          | j        | j        | j        | j        | j        | j        | j        | j        | j	        | j
        |          S r  )r#  r^   r   r%  r&  r   r   r'  r   r   r   r  s     r   r  zShuffleLayer._cullw  sQ    IKKJ"LOO
 
 
 	
r   Fc           
        d| j         z   }|rt          d          }t          d          }nddlm} ddlm} i }d t          | j                  D             }| j        D ]}| j        |         }g }	t          | j
                  D ]B}
t          || j        |
          }|| j                 }|	                    | j        ||f           C||	| j        f|| j         |f<   |	D ]\  }}}t           j        ||f|f|| j        ||f<   ||f|vrn||         }| j        dk    r%|| j        k     r
| j        |f}n||df}| j        ||<   n	| j        |f}||| j        | j        | j
        | j        | j        | j        f|||f<   |S )	z2Construct graph for a "rearrange-by-column" stage.r  r  r  r   r  r  c                    i | ]\  }}||	S r(   r(   r+  s      r   r-  z1ShuffleLayer._construct_graph.<locals>.<dictcomp>  r.  r   r/  )r^   r   r  r  r  r  r   r%  r   r   r'  r   r&  r   r   r   r   r   r   r   r   r   r   )r   r   r  r  r  rn   r0  r   r1  r  rD   r2  _idxr  r   	input_keys                   r   rl   zShuffleLayer._construct_graph  s    &	1 
	S --JKKK!36" "
 CBBBBBRRRRRRDDYt{-C-CDDDN 0	 0	D+d#CL4<(( C Cc4:q114:##T_dD$ABBBB !&CD!" ".  4$'.6T_dD12 '-S88(.EzQ 4#999)-%(@II *<T7(KI-1_C	NN%)_e$<	 +!
.)(	7C+T23-B 
r   r!  r   r   )
r$   r%   r&   r'   r   rf   r   r  rl   rY   rZ   s   @r   r#  r#    s        " "` 
 
 
 
 
 
<
 
 

   &
 
 
E E E E E E E Er   r#  c                       e Zd ZdZ	 	 	 	 d fd	Zd Zd Zd Zed             Z	d Z
d	 Zd
 Zd Zed             ZddZd Zd ZddZ xZS )BroadcastJoinLayera;  Broadcast-based Join Layer

    High-level graph layer for a join operation requiring the
    smaller collection to be broadcasted to every partition of
    the larger collection.

    Parameters
    ----------
    name : str
        Name of new (joined) output collection.
    lhs_name: string
        "Left" DataFrame collection to join.
    lhs_npartitions: int
        Number of partitions in "left" DataFrame collection.
    rhs_name: string
        "Right" DataFrame collection to join.
    rhs_npartitions: int
        Number of partitions in "right" DataFrame collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations.
    **merge_kwargs : **dict
        Keyword arguments to be passed to chunkwise merge func.
    Nc                $   t                                          |           || _        || _        || _        || _        || _        || _        |p t          t          | j                            | _
        t          |	t                    rt          |	          n|	| _        t          |
t                    rt          |
          n|
| _        || _        | j                            d          | _        | j        | j        d<   | j        | j        d<   d S )Nr   howleft_onright_on)rM   r   r^   r   lhs_namelhs_npartitionsrhs_namerhs_npartitionsr   r   r   r   r   r7   r<  r=  merge_kwargsr   r;  )r   r^   r   r>  r?  r@  rA  r   r   r<  r=  rB  rN   s               r   r   zBroadcastJoinLayer.__init__  s     	[111	& . ."Bc%0@*A*A&B&B)3GT)B)BOuW~~~+5h+E+ESh8($((//'+|)$(,*%%%r   c                *      fd j         D             S )Nc                "    h | ]}j         |fS r(   rd   r   s     r   r   z5BroadcastJoinLayer.get_output_keys.<locals>.<setcomp>  r   r   r   re   s   `r   r   z"BroadcastJoinLayer.get_output_keys  r   r   c                Z    d                     | j        | j        | j        | j                  S )Nz5BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>)r   r^   r;  r>  r@  re   s    r   rf   zBroadcastJoinLayer.__repr__	  s+    FMMItx
 
 	
r   c                "    t          | d          S r   r   re   s    r   r   z"BroadcastJoinLayer.is_materialized  r   r   c                t    t          | d          r| j        S |                                 }|| _        | j        S rh   rj   rm   s     r   ro   zBroadcastJoinLayer._dict  rp   r   c                    | j         |         S r   rr   rs   s     r   r<   zBroadcastJoinLayer.__getitem__  ru   r   c                *    t          | j                  S r   rw   re   s    r   ry   zBroadcastJoinLayer.__iter__  rz   r   c                *    t          | j                  S r   r|   re   s    r   r}   zBroadcastJoinLayer.__len__!  r~   r   c                    t                      }|D ]:}	 |\  }}n# t          $ r Y w xY w|| j        k    r%|                    |           ;|S r   r   r   s         r   r   z!BroadcastJoinLayer._keys_to_parts$  r   r   c                    | j         | j        k     r| j        | j         | j        | j        fS | j        | j        | j        | j        fS r   )r?  rA  r>  r@  r=  r<  re   s    r   _broadcast_planz"BroadcastJoinLayer._broadcast_plan1  sW     $"666 $	  $	 r   c                "   | j         dd         \  }}t          t                    }|p|                     |          }|D ]L}|| j        |fxx         fdt          |          D             z  cc<   || j        |fxx         ||fhz  cc<   M|S )zDetermine the necessary dependencies to produce `keys`.

        For a broadcast join, output partitions always depend on
        all partitions of the broadcasted collection, but only one
        partition of the "other" collection.
        N   c                    h | ]}|fS r(   r(   )r4   rD   
bcast_names     r   r   z8BroadcastJoinLayer._cull_dependencies.<locals>.<setcomp>X  s    'S'S'SAQ'S'S'Sr   )rM  r   r   r   r^   r   )r   r   r   
bcast_size
other_namer   r   rQ  s          @r   r   z%BroadcastJoinLayer._cull_dependenciesK  s     .2-A"1"-E*
J
3:!4!4T!:!:	 	 	D$)T"###'S'S'S'SzARAR'S'S'SS###$)T"###T"( #### r   c                |    t          | j        | j        | j        | j        | j        | j        f| j        |d| j        S )N)r   r   )	r9  r^   r   r>  r?  r@  rA  r   rB  r  s     r   r  zBroadcastJoinLayer._cull^  sW    !IM M 

 (

 

 

 

 
	
r   c                    |                      |          }|                     ||          }|t          | j                  k    r|                     |          }||fS | |fS )a  Cull a BroadcastJoinLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   r  r  s         r   r  zBroadcastJoinLayer.cullk  r  r   Fc                T   d| j         z   }d| j         z   }|r.t          d          }t          d          }t          d          }nddlm} ddlm} dd	lm} | j        \  }}}	}
| j        | j        k     rd
nd}i }| j	        D ]}| j
        dk    r||	|f|
|f|||f<   g }t          |          D ]m}| j
        dk    rt          j        ||f|fn|	|f||fg}|d
k    r|                                 |||f}t          ||| j        f||<   |                    |           n||f|| j         |f<   |S )z/Construct graph for a broadcast join operation.zinter-r   z%dask.dataframe.multi._split_partitionz$dask.dataframe.multi._concat_wrapperz)dask.dataframe.multi._merge_chunk_wrapperr   )_concat_wrapper)_merge_chunk_wrapper)_split_partitionleftrightinner)r^   r   dask.dataframe.multirW  rX  rY  rM  r?  rA  r   r;  r   r   r   reverser   rB  r   )r   r   
inter_namer   split_partition_funcr  merge_chunk_funcrQ  rR  rS  other_on
bcast_sidern   rD   r  j_merge_args	inter_keys                     r   rl   z#BroadcastJoinLayer._construct_graph{  s    	)
	)
 	V $67$ $  --STTK1;   
 LKKKKKUUUUUUUUUUUU 8<7K4
J
H#3d6JJJVVPW
  ,	> ,	>Ax7""(O	(ZO$ L:&& / / x7**	 !(#Q  %aO	 ''  '')))'A.	$%	"I ##I.... $/"=CA
r   )NNNNr   r   )r$   r%   r&   r'   r   r   rf   r   r   ro   r<   ry   r}   r   rM  r   r  r  rl   rY   rZ   s   @r   r9  r9    sC        D 6 6 6 6 6 6:> > >
 
 

- - - ! ! X!              X2   &
 
 
% % % M M M M M M M Mr   r9  c                  N     e Zd ZdZ	 	 	 	 d fd	Zed             Zd Zd Z xZ	S )	DataFrameIOLayera  DataFrame-based Blockwise Layer with IO

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    columns : str, list or None
        Field name(s) to read in as columns in the output.
    inputs : list or BlockwiseDep
        List of arguments to be passed to ``io_func`` so
        that the materialized task to produce partition ``i``
        will be: ``(<io_func>, inputs[i])``.  Note that each
        element of ``inputs`` is typically a tuple of arguments.
    io_func : callable
        A callable function that takes in a single tuple
        of arguments, and outputs a DataFrame partition.
        Column projection will be supported for functions
        that satisfy the ``DataFrameIOFunction`` protocol.
    label : str (optional)
        String to use as a prefix in the place-holder collection
        name. If nothing is specified (default), "subset-" will
        be used.
    produces_tasks : bool (optional)
        Whether one or more elements of `inputs` is expected to
        contain a nested task. This argument in only used for
        serialization purposes, and will be deprecated in the
        future. Default is False.
    creation_info: dict (optional)
        Dictionary containing the callable function ('func'),
        positional arguments ('args'), and key-word arguments
        ('kwargs') used to produce the dask collection with
        this underlying ``DataFrameIOLayer``.
    annotations: dict (optional)
        Layer annotations to pass through to Blockwise.
    NFc	                   || _         || _        || _        || _        || _        || _        || _        || _        t          |t                    s3t          d t          | j                  D             | j                  }	n|}	| j         |t          d          fi}
t                                          | j         d|
|	dfgi |           d S )Nc                    i | ]	\  }}|f|
S r(   r(   r+  s      r   r-  z-DataFrameIOLayer.__init__.<locals>.<dictcomp>  s     @@@vq#!s@@@r   )r0   r   rD   )outputoutput_indicesrn   indicesr.   r   )r^   _columnsr%  io_funclabelr0   r   creation_infor   r   r   r   r   rM   r   )r   r^   columnsr%  ro  rp  r0   rq  r   
io_arg_maprn   rN   s              r   r   zDataFrameIOLayer.__init__  s     	
,&*&,// 	 )@@4;)?)?@@@#2  JJ
  J y7OA$6$6789 #&'# 	 	
 	
 	
 	
 	
r   c                    | j         S )z(Current column projection for this layer)rn  re   s    r   rr  zDataFrameIOLayer.columns  s     }r   c           	        ddl m} t          |          }| j        't	          | j                                      |          rt          | j        |          r| j                            |          }n| j        }t          | j
        pddz   t          | j        |          z   || j        || j
        | j        | j                  }|S | S )zProduce a column projection for this IO layer.
        Given a list of required output columns, this method
        returns the projected layer.
        r   )DataFrameIOFunctionNsubset-)rp  r0   r   )dask.dataframe.io.utilsrv  r   rr  r   
issupersetr   ro  project_columnsrh  rp  r
   r^   r%  r0   r   )r   rr  rv  ro  layers        r   r{  z DataFrameIOLayer.project_columns  s    
 	@?????w--<3t|#4#4#?#?#H#H $,(;<< ',66w??,$'x3.$)W1M1MMj#2 ,  E L Kr   c                h    d                     | j        t          | j                  | j                  S )Nz3DataFrameIOLayer<name='{}', n_parts={}, columns={}>)r   r^   r3   r%  rr  re   s    r   rf   zDataFrameIOLayer.__repr__<  s/    DKKIs4;''
 
 	
r   )NFNN)
r$   r%   r&   r'   r   r   rr  r{  rf   rY   rZ   s   @r   rh  rh    s        " "T &
 &
 &
 &
 &
 &
P   X  >
 
 
 
 
 
 
r   rh  c                      e Zd ZU dZded<   ded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   	 	 	 	 	 	 d,d- fdZdddZd.dZd  Zd! Z	d" Z
d# Zd$ Zed%             Zd& Zd' Zd( Zd) Zd* Zd+ Z xZS )/DataFrameTreeReductionag  DataFrame Tree-Reduction Layer

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    name_input : str
        Name of the input layer that is being reduced.
    npartitions_input : str
        Number of partitions in the input layer.
    concat_func : callable
        Function used by each tree node to reduce a list of inputs
        into a single output value. This function must accept only
        a list as its first positional argument.
    tree_node_func : callable
        Function used on the output of ``concat_func`` in each tree
        node. This function must accept the output of ``concat_func``
        as its first positional argument.
    finalize_func : callable, optional
        Function used in place of ``tree_node_func`` on the final tree
        node(s) to produce the final output for each split. By default,
        ``tree_node_func`` will be used.
    split_every : int, optional
        This argument specifies the maximum number of input nodes
        to be handled by any one task in the tree. Defaults to 32.
    split_out : int, optional
        This argument specifies the number of output nodes in the
        reduction tree. If ``split_out`` is set to an integer >=1, the
        input tasks must contain data that can be indexed by a ``getitem``
        operation with a key in the range ``[0, split_out)``.
    output_partitions : list, optional
        List of required output partitions. This parameter is used
        internally by Dask for high-level culling.
    tree_node_name : str, optional
        Name to use for intermediate tree-node tasks.
    strr^   r   r   r   r   r  tree_node_funcCallable | Nonefinalize_funcsplit_every	split_outz	list[int]output_partitionstree_node_namewidthsheightN    
int | Nonelist[int] | None
str | Noner   dict[str, Any] | Nonec                $   t                                          |           || _        || _        || _        || _        || _        || _        || _        || _	        |	#t          t          | j	        pd                    n|	| _        |
p	d| j        z   | _        | j        }|g| _        |dk    rIt          j        || j        z            }| j                            t%          |                     |dk    It'          | j                  | _        d S )Nr   rR   z
tree_node-)rM   r   r^   r   r   r  r  r  r  r  r   r   r  r  r  mathceilr   r   r3   r  )r   r^   r   r   r  r  r  r  r  r  r  r   r   rN   s                r   r   zDataFrameTreeReduction.__init__u  s    	[111	$!2&,*&" !( t~*++,,," 	
 -Hty0H &gaiiIed&6677EKs5zz*** aii $+&&r   r   splitc                    | j         r||fz   n|S r   )r  )r   r  
name_partss      r   	_make_keyz DataFrameTreeReduction._make_key  s     )-FzUH$$JFr   Fc                \    |r| j         r| j         }n| j        }t          j        || j        |fS r   )r  r  r   r   r  )r   
input_keys
final_task
outer_funcs       r   _define_taskz#DataFrameTreeReduction._define_task  s:     	-$, 	-+JJ,J
J(8*EEr   c                   	
 i } j         s|S  j        	 j        rU	dz  	 j         D ]H
t           j                  D ]1}t
          j         j        |f
f|                     	|
          <   2I j        dk    r3 j         D ])
t          d j                  D ]t           j	                           D ]} j	        dz
           } j
        |z  }t          | j
        z   |          }dk    r	
 fdt          ||          D             }n
 fdt          ||          D             } j        dz
  k    r6|dk    sJ d| d	                                 |d
          | j        
f<                        |d          |                      j        |
          <   +nD j         D ]<
                     	d
          g}                     |d
          | j        
f<   =|S )z%Construct graph for a tree reduction.z-splitr     rR   c                @    g | ]}                     |           S )r  )r  )r4   pname_input_userW   r   s     r   r   z;DataFrameTreeReduction._construct_graph.<locals>.<listcomp>  s;     * * *$% !%~q J J* * *r   c                R    g | ]#}                     j        |d z
            $S )rR   r  )r  r  )r4   r  r   rW   r   s     r   r   z;DataFrameTreeReduction._construct_graph.<locals>.<listcomp>  sN     * * * %& !%$($7EAIQ !/ !" !"* * *r   r   zgroup = z%, not 0 for final tree reduction taskT)r  F)r  r   r  r   r   r   r   r  r  r  r  minr  r^   r  )r   rn   r  groupp_maxlstartlstopr  r   r  rW   s   `       @@@r   rl   z'DataFrameTreeReduction._construct_graph  s    % 	J > 	h&N+  t566  A (!,GC~qBBCC ;!+ 'P 'P"1dk22 %P %PE!&t{5'9!:!: #P #P $EAI 6!%!1E!9 #FT-=$=u E E A::* * * * * *).vu)=)=* * *JJ* * * * * * */vu)=)=	* * *J !DK!O33 !&


V%VVV !+

262C2C *t 3D 3 3CA// !% 1 1* 1 O O	   $$($7Q !/ !" !" ?#P%P'PT + U U"nn^QanHHI
&*&7&7
t&7&T&TTYN##
r   c                N    d                     | j        | j        | j                  S )Nz>DataFrameTreeReduction<name='{}', input_name={}, split_out={}>)r   r^   r   r  re   s    r   rf   zDataFrameTreeReduction.__repr__  s'    OVVIt
 
 	
r   c                *      fd j         D             S )Nc                "    h | ]}j         |fS r(   rd   )r4   rW   r   s     r   r   z6DataFrameTreeReduction._output_keys.<locals>.<setcomp>  s    ???1A???r   )r  re   s   `r   _output_keysz#DataFrameTreeReduction._output_keys  s     ????(>????r   c                t    t          | d          r| j        S |                                 }|| _        | j        S )N_cached_output_keys)rk   r  r  )r   output_keyss     r   r   z&DataFrameTreeReduction.get_output_keys  s@    4.// 	3++++--K'2D$''r   c                "    t          | d          S r   r   re   s    r   r   z&DataFrameTreeReduction.is_materialized  r   r   c                t    t          | d          r| j        S |                                 }|| _        | j        S rh   rj   rm   s     r   ro   zDataFrameTreeReduction._dict  rp   r   c                    | j         |         S r   rr   rs   s     r   r<   z"DataFrameTreeReduction.__getitem__  ru   r   c                *    t          | j                  S r   rw   re   s    r   ry   zDataFrameTreeReduction.__iter__  rz   r   c                    t          | j        dd                    pd| j        pdz  }| j        r|| j        t	          | j                  z  z   S |S )NrR   )sumr  r  r   r3   r  )r   	tree_sizes     r   r}   zDataFrameTreeReduction.__len__  sY    QRR)).Q4>3FQG	> 	Tt5D<R8S8SSSSr   c                    t                      }|D ]:}	 |\  }}n# t          $ r Y w xY w|| j        k    r%|                    |           ;|S )z;Simple utility to convert keys to output partition indices.r   )r   r   splitsrt   r   _splits         r   _keys_to_output_partitionsz1DataFrameTreeReduction._keys_to_output_partitions  st     	 	C #vv   	!!JJvr   c                    t          | j        | j        | j        | j        | j        | j        | j        | j        || j	        | j
                  S )N)r  r  r  r  r  r   )r  r^   r   r   r  r  r  r  r  r  r   )r   r  s     r   r  zDataFrameTreeReduction._cull,  sU    %IO",(n/.(
 
 
 	
r   c                      j         df fdt           j                  D             i}                     |          }|t	           j                  k    r                     |          }||fS  |fS )z2Cull a DataFrameTreeReduction HighLevelGraph layerr   c                "    h | ]}j         |fS r(   r   r   s     r   r   z.DataFrameTreeReduction.cull.<locals>.<setcomp>>  s-       )*!$  r   )r^   r   r   r  r   r  r  )r   r   r  r   r  r
  s   `     r   r  zDataFrameTreeReduction.cull;  s     YN    .3D4J.K.K  

 !;;DAAD$: ; ;;;::&788L%%:r   )Nr  NNNN)r^   r  r   r  r   r   r  r   r  r   r  r  r  r   r  r  r  r  r  r  r   r  r   )r$   r%   r&   r'   r?   r   r  r  rl   rf   r  r   r   r   ro   r<   ry   r}   r  r  r  rY   rZ   s   @r   r  r  B  s        # #J IIIOOO""""NNN    KKK *. $.2%)-1%' %' %' %' %' %' %'N ,- G G G G GF F F FH H HT
 
 

@ @ @( ( (- - - ! ! X!           
 
 
      r   r  r!  ).
__future__r   r   r  r   collectionsr   collections.abcr   	itertoolsr   typingr   tlzr   tlz.curriedr	   	dask.baser
   dask.blockwiser   r   r   r   	dask.corer   dask.highlevelgraphr   
dask.utilsr   r   r   r   r   r*   rA   rG   r\   r   r   r   r   r#  r9  rh  r  r(   r   r   <module>r     sN   " " " " " "       # # # # # # $ $ $ $ $ $                             U U U U U U U U U U U U       % % % % % % = = = = = = = = = = = =@ @ @ @ @ @ @ @.K K K K K K K K&E E E E E* E E E3 3 3 3 3% 3 3 3w w w w w w w wt4 4 4 4n
R 
R 
R%2 %2 %2^R R R R R R R Rjo o o o o% o o odz z z z z z z zzt
 t
 t
 t
 t
y t
 t
 t
nE E E E EU E E E E Er   