
    LgV                       d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZ d dlZd dlmZ d dlmZmZ d d	lmZ d d
lmZmZmZmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z#m$Z$m%Z%m&Z& erd dl'Z( G d d      Z) G d de      Z* G d de*      Z+ G d de*      Z, G d de*      Z- G d de*      Z. e!j^                  e,      d        Z0 e!j^                  e-      d        Z1 e!j^                  e.      d        Z2 G d de      Z3d-d Z4d! Z5d" Z6 G d# d$e      Z7 G d% d&e7      Z8 G d' d(e      Z9 G d) d*e      Z: G d+ d,e      Z;y).    annotationsN)defaultdict)Callable)product)TYPE_CHECKINGAny)map)TaskTaskRef)tokenize)	BlockwiseBlockwiseDepBlockwiseDepDictblockwise_token)flatten)Layer)normalize_token)applycached_cumsumconcreteinsertc                      e Zd ZdZd Zd Zy)CallableLazyImportzFunction Wrapper for Lazy Importing.

    This Class should only be used when materializing a graph
    on a distributed scheduler.
    c                    || _         y N)function_path)selfr   s     +lib/python3.12/site-packages/dask/layers.py__init__zCallableLazyImport.__init__'   s
    *    c                >    ddl m}   || j                        |i |S )Nr   )import_term)distributed.utilsr#   r   )r   argskwargsr#   s       r   __call__zCallableLazyImport.__call__*   s"    1.{4--.???r!   N)__name__
__module____qualname____doc__r    r'    r!   r   r   r       s    +@r!   r   c                  D    e Zd ZU dZded<   ded<   dZded<   dd	Zdd
Zy)ArrayBlockwiseDepzg
    Blockwise dep for array-likes, which only needs chunking
    information to compute its data.
    tuple[tuple[int, ...], ...]chunkstuple[int, ...]	numblocksFboolproduces_tasksc                N    || _         t        d |D              | _        d| _        y )Nc              3  2   K   | ]  }t        |        y wr   )len).0chunks     r   	<genexpr>z-ArrayBlockwiseDep.__init__.<locals>.<genexpr>C   s     >ves5zvs   F)r0   tupler2   r4   )r   r0   s     r   r    zArrayBlockwiseDep.__init__A   s#    >v>>#r!   c                    t        d      )Nz%Subclasses must implement __getitem__)NotImplementedErrorr   idxs     r   __getitem__zArrayBlockwiseDep.__getitem__F   s    !"IJJr!   Nr0   r/   r?   r1   )r(   r)   r*   r+   __annotations__r4   r    r@   r,   r!   r   r.   r.   7   s*    
 (' ND $
Kr!   r.   c                      e Zd ZdZddZy)ArrayChunkShapeDepz(Produce chunk shapes given a chunk indexc                N    t        d t        || j                        D              S )Nc              3  ,   K   | ]  \  }}||     y wr   r,   )r8   ir9   s      r   r:   z1ArrayChunkShapeDep.__getitem__.<locals>.<genexpr>N   s     D.C(!UU1X.Cs   )r;   zipr0   r>   s     r   r@   zArrayChunkShapeDep.__getitem__M   s    Dc#t{{.CDDDr!   NrB   )r(   r)   r*   r+   r@   r,   r!   r   rE   rE   J   s    2Er!   rE   c                  8     e Zd ZU dZded<   d fdZddZ xZS )ArraySliceDepz>Produce slice(s) into the full-sized array given a chunk indexr/   startsc                R    t         |   |       t        d |D              | _        y )Nc              3  6   K   | ]  }t        |d         yw)T)initial_zeroN)r   )r8   cs     r   r:   z)ArraySliceDep.__init__.<locals>.<genexpr>X   s     PAM!$??   )superr    r;   rL   )r   r0   	__class__s     r   r    zArraySliceDep.__init__V   s"     PPPr!   c                r    t        d t        || j                        D              }t        d |D              S )Nc              3  <   K   | ]  \  }}||   ||d z      f  yw)   Nr,   )r8   rH   starts      r   r:   z,ArraySliceDep.__getitem__.<locals>.<genexpr>[   s(     S=REU1XuQU|,=R   c              3  6   K   | ]  }t        g |d    y wr   slice)r8   ss     r   r:   z,ArraySliceDep.__getitem__.<locals>.<genexpr>\   s     2cU_A_t_crQ   )r;   rI   rL   )r   r?   locs      r   r@   zArraySliceDep.__getitem__Z   s.    SSdkk=RSS2c222r!   rA   r?   r;   )r(   r)   r*   r+   rC   r    r@   __classcell__rS   s   @r   rK   rK   Q   s    H''Q3r!   rK   c                      e Zd ZddZy)ArrayBlockIdDepc                <    t        |t              st        d      |S )Nz&ArrayBlockIdDep requires a tuple index)
isinstancer;   r=   r>   s     r   r@   zArrayBlockIdDep.__getitem__`   s    #u%%&NOO
r!   Nr^   )r(   r)   r*   r@   r,   r!   r   rb   rb   _   s    r!   rb   c                  (     e Zd Zd fdZddZ xZS )ArrayValuesDepc                2    t         |   |       || _        y r   )rR   r    values)r   r0   rh   rS   s      r   r    zArrayValuesDep.__init__h   s     r!   c                     | j                   |   S r   )rh   r>   s     r   r@   zArrayValuesDep.__getitem__l   s    {{3r!   )r0   r/   rh   znp.ndarray | dictr^   )r(   r)   r*   r    r@   r_   r`   s   @r   rf   rf   g   s     r!   rf   c                    d| j                   fS )NrK   r0   deps    r   normalize_array_slice_deprn   p   s    CJJ&&r!   c                    d| j                   fS )Nrb   rk   rl   s    r   normalize_array_block_id_deprp   u   s    cjj((r!   c                4    d| j                   | j                  fS )Nrf   )r0   rh   rl   s    r   normalize_array_values_deprr   z   s    SZZ33r!   c                  d     e Zd ZdZ fdZd Zed        Zd Zd Z	d Z
d Zd	 Zd
 ZddZ xZS )ArrayOverlapLayera\  Simple HighLevelGraph array overlap layer.

    Lazily computed High-level graph layer for a array overlap operations.

    Parameters
    ----------
    name : str
        Name of new output overlap array.
    array : Dask array
    axes: Mapping
        Axes dictionary indicating overlap in each dimension,
        e.g. ``{'0': 1, '1': 1}``
    c                v    t         |           || _        || _        || _        || _        || _        d | _        y r   )rR   r    nameaxesr0   r2   token_cached_keys)r   rv   rw   r0   r2   rx   rS   s         r   r    zArrayOverlapLayer.__init__   s;     			"
 r!   c                "    d| j                    dS )NzArrayOverlapLayer<name=''rv   r   s    r   __repr__zArrayOverlapLayer.__repr__   s    )$))A66r!   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S z$Materialize full dict representation_cached_dicthasattrr   _construct_graphr   dsks     r   _dictzArrayOverlapLayer._dict   <     4($$$'')C #D   r!   c                     | j                   |   S r   r   r   keys     r   r@   zArrayOverlapLayer.__getitem__       zz#r!   c                ,    t        | j                        S r   iterr   r}   s    r   __iter__zArrayOverlapLayer.__iter__       DJJr!   c                ,    t        | j                        S r   r7   r   r}   s    r   __len__zArrayOverlapLayer.__len__       4::r!   c                    t        | d      S Nr   r   r}   s    r   is_materializedz!ArrayOverlapLayer.is_materialized       t^,,r!   c                "    | j                         S r   )keysr}   s    r   get_output_keysz!ArrayOverlapLayer.get_output_keys   s    yy{r!   c                    | j                   | j                   S | j                  | j                  | j                  cfd        x| _         }|S )Nc                     sfgS t        |       }|dz   t              k(  r%t        |         D cg c]  }f| z   |fz    }}|S t        |         D cg c]  } | |fz     }}|S c c}w c c}w NrV   )r7   range)r%   indrH   resultr0   r   rv   r2   s       r   r   z*ArrayOverlapLayer._dask_keys.<locals>.keys   s    y d)CQw#i.(9>y~9NO9NA4'D.A4/9NO M 9>in8MN8M1$.8MNM PNs   A-A2)ry   rv   r0   r2   )r   r   r0   r   rv   r2   s     @@@@r   
_dask_keyszArrayOverlapLayer._dask_keys   sR    ($$$"&))T[[$..fi	 &*V+Fr!   c                   | j                   }| j                  }| j                  }| j                         }d| j                  z   }d| j                  z   }|rt        d      }nddlm} t        t        t        |            }	t        j                  t        |	|      }
t        j                  |t         t        |
      t        t               t        j"                  t              }i }i }|D ]S  }t%        |f|z   |      }|du r|f|z   |k7  r
|||f|z   <   +|f|z   ||f|z   <   |t&         |
d|z   |	      ff||f|z   <   U t        j(                  ||      }|S )
z/Construct graph for a simple overlap operation.zgetitem-zoverlap-zdask.array.core.concatenate3r   )concatenate3)dimsrw   Fr   r|   )rw   r0   rv   r   rx   r   dask.array.corer   listr
   r7   	functoolspartial_expand_keys_around_centertoolzpiper   concatfractional_slicer   merge)r   deserializingrw   r0   rv   	dask_keysgetitem_nameoverlap_namer   r   expand_key2interior_keysinterior_slicesoverlap_blocksk
frac_slicer   s                    r   r   z"ArrayOverlapLayer._construct_graph   sU   yyyyOO%	!DJJ.!DJJ. ..LML 5CV$%''&T

 

wK 0#g,d
 A)4'A+t<JU"w{j(7A! 348<w{! 34 {7Q;\JK723  kk/>:
r!   F)r(   r)   r*   r+   r    r~   propertyr   r@   r   r   r   r   r   r   r_   r`   s   @r   rt   rt      sI    ! 7 ! ! -&+r!   rt   c                t   d fd}g }t        | dd       D ]Z  \  }} |j                  |d            }d}	|dkD  r|d   dk7  r|	dz  }	||   dz
  k  r|d   dk7  r|	dz  }	|j                  |	       \ d }
t        | dd       D cg c]:  \  }} |
|j                  |d            r ||||j                  |d            n|g< }}}||gg|z   }t        t	        |       }t        |      D cg c]!  \  }} |
|j                  |d            r|nd# }}}t        ||      }|S c c}}w c c}}w )a  Get all neighboring keys around center

    Parameters
    ----------
    k: Key
        The key around which to generate new keys
    dims: Sequence[int]
        The number of chunks in each dimension
    name: Option[str]
        The name to include in the output keys, or none to include no name
    axes: Dict[int, int]
        The axes active in the expansion.  We don't expand on non-active axes

    Examples
    --------
    >>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
     [('y',   2, 2.1), ('y',   2, 3), ('y',   2, 3.9)],
     [('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]

    >>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y',   0, 3.1), ('y',   0,   4)],
     [('y', 0.9, 3.1), ('y', 0.9,   4)]]
    c                .    t        | t              s| | f} | S r   )rd   r;   depths    r   convert_depthz1_expand_keys_around_center.<locals>.convert_depth  s    %'ENEr!   c                     |      }g }|dz
  dkD  r|d   dk7  r|j                  |dz
         |j                  |       |dz   |    dz
  k  r|d   dk7  r|j                  |dz          |S )Ng?r   rV   )append)rH   r   r   rvr   r   s       r   indsz(_expand_keys_around_center.<locals>.inds  su    e$9q=U1X]IIcCi 
		#9tAw{"uQx1}IIcCi 	r!   rV   Nr   c                P    t        | t              rt        d | D              S | dk7  S )Nc              3  &   K   | ]	  }|d k7    yw)r   Nr,   )r8   xs     r   r:   zC_expand_keys_around_center.<locals>._valid_depth.<locals>.<genexpr>0  s     -u!qAvus   r   )rd   r;   anyr   s    r   _valid_depthz0_expand_keys_around_center.<locals>._valid_depth.  s'    eU#-u---A:r!   )	enumerategetr   r   r   reshapelist)r   r   rv   rw   r   shaperH   r   r   numr   r%   seqdshape2r   r   s    `              @r   r   r      sd   4
 EAabE"3dhhq!n-7uQx1}1HCa1qQ1HCS #  !"&&FAs )5TXXa^(DQTXXa^$3%O& 	  x$
w~
CCLUCSTCS41a<A/aQ6CSFT%FM Us   ?D.7&D4c                    t        |       dk(  rt        |      S t        t        |      | d   z        }t        j                  ||      D cg c]  }t        | dd |       c}S c c}w )zgReshape iterator to nested shape

    >>> reshapelist((2, 3), range(6))
    [[0, 1, 2], [3, 4, 5]]
    rV   r   N)r7   r   intr   	partitionr   )r   r   nparts       r   r   r   @  sb     5zQCyC58#$9>C9PQ9PE!"It,9PQQQs   A%c                $   | d   ft        d | dd D              z   }g }t        t        | dd |dd             D ]  \  }\  }}|j                  |d      }t	        |t               r|d   }|d   }	n|}|}	||k(  r|j                  t        ddd             \||k  r|	r|j                  t        d|	             ||kD  r|r|j                  t        | d              y t        |      }t        d |D              r| S t        j                  ||fS )a  

    >>> fractional_slice(('x', 5.1), {0: 2})
    (<built-in function getitem>, ('x', 5), (slice(-2, None, None),))

    >>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))

    >>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
    r   c              3  D   K   | ]  }t        t        |              y wr   )r   round)r8   rH   s     r   r:   z#fractional_slice.<locals>.<genexpr>Y  s      A1U1Xs    rV   NFc              3  <   K   | ]  }|t        d d d       k(    y wr   rZ   )r8   r   s     r   r:   z#fractional_slice.<locals>.<genexpr>o  s     
;Uc3%dD))UrX   )
r;   r   rI   r   rd   r   r[   alloperatorgetitem)
taskrw   roundedindexrH   trr   
left_depthright_depths
             r   r   r   M  s    Awj5 AQR AAAGEs48WQR[9:	6AqAeU#qJ(KJK6LLtT401U{LLq+./UzLL
{D12! ;" %LE

;U
;;  '511r!   c                       e Zd ZdZ	 	 d fd	Zd Zd Zd Zd Ze	d        Z
d Zd	 Zd
 Zd ZddZd Zd ZddZ xZS )SimpleShuffleLayera  Simple HighLevelGraph Shuffle layer

    High-level graph layer for a simple shuffle operation in which
    each output partition depends on all input partitions.

    Parameters
    ----------
    name : str
        Name of new shuffled output collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    npartitions : int
        Number of output partitions.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    c
                   || _         || _        || _        || _        || _        || _        || _        |xs t        |      | _        d| j                   z   | _	        |	xs i }	d | _
        d|	vr| j                  |	d<   t        
| 5  |	       y )Nsplit-priorityr   )rv   columnnpartitionsnpartitions_inputignore_index
name_input
meta_inputr   	parts_out
split_name_split_keys_key_priorityrR   r    )r   rv   r   r   r   r   r   r   r   r   rS   s             r   r    zSimpleShuffleLayer.__init__  s     	&!2($$"8eK&8"TYY. "'R[(&*&8&8K
#[1r!   c                N    t        |t              sJ |d   | j                  k(  ryy)Nr   rV   )rd   r;   r   r   s     r   r   z SimpleShuffleLayer._key_priority  s(    #u%%%q6T__$r!   c                X    | j                   D ch c]  }| j                  |f c}S c c}w r   r   rv   r   r   s     r   r   z"SimpleShuffleLayer.get_output_keys  '    .2nn=ndD!n===   'c                N    dj                  | j                  | j                        S )Nz-SimpleShuffleLayer<name='{}', npartitions={}>)formatrv   r   r}   s    r   r~   zSimpleShuffleLayer.__repr__  s$    >EEIIt''
 	
r!   c                    t        | d      S r   r   r}   s    r   r   z"SimpleShuffleLayer.is_materialized  r   r!   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S r   r   r   s     r   r   zSimpleShuffleLayer._dict  r   r!   c                     | j                   |   S r   r   r   s     r   r@   zSimpleShuffleLayer.__getitem__  r   r!   c                ,    t        | j                        S r   r   r}   s    r   r   zSimpleShuffleLayer.__iter__  r   r!   c                ,    t        | j                        S r   r   r}   s    r   r   zSimpleShuffleLayer.__len__  r   r!   c                    t               }|D ])  }	 |\  }}|| j                  k7  r|j                  |       + |S # t        $ r Y 9w xY wz4Simple utility to convert keys to partition indices.set
ValueErrorrv   addr   r   partsr   _name_parts         r   _keys_to_partsz!SimpleShuffleLayer._keys_to_parts  Y    C"u 		!IIe       ;	AAc           	         t        t              }|xs | j                  |      }|D ]H  }|| j                  |fxx   t	        | j
                        D ch c]  }| j                  |f c}z  cc<   J |S c c}w )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        )r   r  r  rv   r   r   r   )r   r   r   depsr   rH   s         r   _cull_dependenciesz%SimpleShuffleLayer._cull_dependencies  s~     3:!4!4T!:	D$))T"#.3D4J4J.K(.K!$.K( #  (s   A4c           
         t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  |      S Nr   )r   rv   r   r   r   r   r   r   r   r   s     r   _cullzSimpleShuffleLayer._cull  sI    !IIKK""OOOO	
 		
r!   c                    | j                  |      }| j                  ||      }|t        | j                        k7  r| j	                  |      }||fS | |fS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r  r  r  r  r   r  r   r   all_keysr   culled_depsculled_layers         r   cullzSimpleShuffleLayer.cull  ^     ''-	--di-HDNN++::i0L,,$$r!   c           
     @   d| j                   z   }|rt        d      }t        d      }nddlm} ddlm} i }| j                  D ]  }t        | j                        D cg c]  }| j                  ||f }}||| j                  f|| j                   |f<   |D ]  \  }	}
}t        j                  ||f|
f|| j                  |
|f<   ||f|vs3|| j                  |f| j                  d| j                  | j                  | j                  | j                  f|||f<     |S c c}w )z/Construct graph for a simple shuffle operation.group-dask.dataframe.core._concat$dask.dataframe.shuffle.shuffle_groupr   _concatshuffle_group)rv   r   dask.dataframe.corer&  dask.dataframe.shuffler(  r   r   r   r   r   r   r   r   r   r   )r   r   shuffle_group_nameconcat_funcshuffle_group_funcr   part_outpart_in_concat_list_	_part_out_part_ins               r   r   z#SimpleShuffleLayer._construct_graph   sS    &		1 --JKK!36"
 CRH  %T%;%;<<G (G4<  
 !!*CH%&
 +7&9h$$'2?T__i:;
 '1<*(3(((())((	;C+X67 +7 '8 
7s   DNNr   r   )r(   r)   r*   r+   r    r   r   r~   r   r   r   r@   r   r   r  r  r  r  r   r_   r`   s   @r   r   r   |  sg    J (2T>

- ! ! 

% .r!   r   c                  D     e Zd ZdZ	 	 d fd	Zd ZddZd Zd	dZ xZ	S )
ShuffleLayera"  Shuffle-stage HighLevelGraph layer

    High-level graph layer corresponding to a single stage of
    a multi-stage inter-partition shuffle operation.

    Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)

    Parameters
    ----------
    name : str
        Name of new (partially) shuffled collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    inputs : list of tuples
        Each tuple dictates the data movement for a specific partition.
    stage : int
        Index of the current shuffle stage.
    npartitions : int
        Number of output partitions for the full (multi-stage) shuffle.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    k : int
        A partition is split into this many groups during each stage.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    c                    || _         || _        || _        t        |   ||||||	|
|xs t        t        |            |	       y )N)r   r   )inputsstagensplitsrR   r    r   r7   )r   rv   r   r8  r9  r   r   r:  r   r   r   r   r   rS   s                r   r    zShuffleLayer.__init__v  sU     
55V#5# 	 
	
r!   c                z    dj                  | j                  | j                  | j                  | j                        S )Nz=ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>)r   rv   r9  r:  r   r}   s    r   r~   zShuffleLayer.__repr__  s0    NUUIItzz4<<1A1A
 	
r!   c                8   t        t              }|xs | j                  |      }t        | j                        D ci c]  \  }}||
 }}}|D ]  }| j                  |   }t        | j                        D ]  }	t        || j                  |	      }
||
   }| j                  dk(  r@|| j                  k\  r1|| j                  |f   j                  d| j                  z   |
df       n|| j                  |f   j                  | j                  |f         |S c c}}w )zqDetermine the necessary dependencies to produce `keys`.

        Does not require graph materialization.
        r   r"  empty)r   r  r  r   r8  r   r:  r   r9  r   rv   r	  r   )r   r   r   r  rH   inpinp_part_mapr   outr   _inpr  s               r   r  zShuffleLayer._cull_dependencies  s   
 3:!4!4T!:	-6t{{-CD-C61cQ-CDD++d#C4<<(c4::q1$T*::?u0F0F'F$))T*+//DII1EtW0UV$))T*+//%0HI )   Es   Dc                    t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  | j                  | j                  | j                  |      S r  )r6  rv   r   r8  r9  r   r   r:  r   r   r   r  s     r   r  zShuffleLayer._cull  s^    IIKKKKJJ""LLOOOO
 	
r!   c           
        d| j                   z   }|rt        d      }t        d      }nddlm} ddlm} i }t        | j                        D ci c]  \  }}||
 }}}| j                  D ]g  }	| j                  |	   }
g }t        | j                        D ]F  }t        |
| j                  |      }|
| j                     }|j                  | j                  ||f       H ||| j                  f|| j                   |	f<   |D ]  \  }}}t         j"                  ||f|f|| j                  ||f<   ||f|vs3||   }| j                  dk(  r3|| j$                  k  r| j&                  |f}n#||df}| j(                  ||<   n| j&                  |f}||| j*                  | j                  | j                  | j$                  | j                  | j,                  f|||f<    j |S c c}}w )z2Construct graph for a "rearrange-by-column" stage.r"  r#  r$  r   r%  r'  r=  )rv   r   r)  r&  r*  r(  r   r8  r   r   r:  r   r9  r   r   r   r   r   r   r   r   r   r   )r   r   r+  r,  r-  r   rH   r>  r?  r   r@  r0  rA  _idxr1  r  	input_keys                    r   r   zShuffleLayer._construct_graph  s    &		1 --JKK!36"
 CR-6t{{-CD-C61cQ-CDNND++d#CL4<<(c4::q14::##T__dD$AB	 ) !!&CD!" ".4$$'.6T__dD12 '-S8(.EzzQ 4#9#99)-%(@I *<T7(KI-1__C	N%)__e$<	 +!

..))((	7C+T23- ".# #d 
g Es   Gr4  r   r   )
r(   r)   r*   r+   r    r~   r  r  r   r_   r`   s   @r   r6  r6  Q  s+    "` 
<

&
Er!   r6  c                       e Zd ZdZ	 	 	 	 d fd	Zd Zd Zd Zed        Z	d Z
d Zd	 Zd
 Zed        ZddZd Zd ZddZ xZS )BroadcastJoinLayera;  Broadcast-based Join Layer

    High-level graph layer for a join operation requiring the
    smaller collection to be broadcasted to every partition of
    the larger collection.

    Parameters
    ----------
    name : str
        Name of new (joined) output collection.
    lhs_name: string
        "Left" DataFrame collection to join.
    lhs_npartitions: int
        Number of partitions in "left" DataFrame collection.
    rhs_name: string
        "Right" DataFrame collection to join.
    rhs_npartitions: int
        Number of partitions in "right" DataFrame collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations.
    **merge_kwargs : **dict
        Keyword arguments to be passed to chunkwise merge func.
    c                   t         |   |       || _        || _        || _        || _        || _        || _        |xs t        t        | j                              | _
        t        |	t              rt        |	      n|	| _        t        |
t              rt        |
      n|
| _        || _        | j                   j#                  d      | _        | j                  | j                   d<   | j                  | j                   d<   y )Nr   howleft_onright_on)rR   r    rv   r   lhs_namelhs_npartitionsrhs_namerhs_npartitionsr  r   r   rd   r   r;   rJ  rK  merge_kwargsr   rI  )r   rv   r   rL  rM  rN  rO  r   r   rJ  rK  rP  rS   s               r   r    zBroadcastJoinLayer.__init__  s     	[1	& . ."Bc%0@0@*A&B)3GT)BuW~+5h+Eh8($$((/'+||)$(,*%r!   c                X    | j                   D ch c]  }| j                  |f c}S c c}w r   r   r   s     r   r   z"BroadcastJoinLayer.get_output_keys;  r   r   c                z    dj                  | j                  | j                  | j                  | j                        S )Nz5BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>)r   rv   rI  rL  rN  r}   s    r   r~   zBroadcastJoinLayer.__repr__>  s.    FMMIItxx
 	
r!   c                    t        | d      S r   r   r}   s    r   r   z"BroadcastJoinLayer.is_materializedC  r   r!   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S r   r   r   s     r   r   zBroadcastJoinLayer._dictF  r   r!   c                     | j                   |   S r   r   r   s     r   r@   zBroadcastJoinLayer.__getitem__P  r   r!   c                ,    t        | j                        S r   r   r}   s    r   r   zBroadcastJoinLayer.__iter__S  r   r!   c                ,    t        | j                        S r   r   r}   s    r   r   zBroadcastJoinLayer.__len__V  r   r!   c                    t               }|D ])  }	 |\  }}|| j                  k7  r|j                  |       + |S # t        $ r Y 9w xY wr  r  r
  s         r   r  z!BroadcastJoinLayer._keys_to_partsY  r  r  c                    | j                   | j                  k  r.| j                  | j                   | j                  | j                  fS | j                  | j                  | j                  | j
                  fS r   )rM  rO  rL  rN  rK  rJ  r}   s    r   _broadcast_planz"BroadcastJoinLayer._broadcast_planf  sl     $"6"66 $$	  $$	 r!   c           	     (   | j                   dd \  }}}t        t              }|xs | j                  |      }|D ]P  }|| j                  |fxx   t        |      D ch c]  }||f c}z  cc<   || j                  |fxx   ||fhz  cc<   R |S c c}w )zDetermine the necessary dependencies to produce `keys`.

        For a broadcast join, output partitions always depend on
        all partitions of the broadcasted collection, but only one
        partition of the "other" collection.
        N   )rZ  r   r  r  rv   r   )	r   r   r   
bcast_name
bcast_size
other_namer  r   rH   s	            r   r  z%BroadcastJoinLayer._cull_dependencies  s     .2-A-A"1-E*
J
3:!4!4T!:	D$))T"#zAR'SARAQAR'SS#$))T"#T"( # 
 	 (Ts   Bc                    t        | j                  | j                  | j                  | j                  | j
                  | j                  f| j                  |d| j                  S )N)r   r   )	rG  rv   r   rL  rM  rN  rO  r   rP  r  s     r   r  zBroadcastJoinLayer._cull  sb    !IIMM  MM  

 ((

 

 
	
r!   c                    | j                  |      }| j                  ||      }|t        | j                        k7  r| j	                  |      }||fS | |fS )a  Cull a BroadcastJoinLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r  r  r  s         r   r  zBroadcastJoinLayer.cull  r   r!   c                   d| j                   z   }d| j                   z   }|r"t        d      }t        d      }t        d      }nddlm} ddlm} dd	lm} | j                  \  }}}	}
| j                  | j                  k  rd
nd}i }| j                  D ]  }| j                  dk7  r||	|f|
|f|||f<   g }t        |      D ]p  }| j                  dk7  rt        j                  ||f|fn|	|f||fg}|d
k(  r|j                          |||f}t        ||| j                   f||<   |j#                  |       r ||f|| j                   |f<    |S )z/Construct graph for a broadcast join operation.zinter-r   z%dask.dataframe.multi._split_partitionz$dask.dataframe.multi._concat_wrapperz)dask.dataframe.multi._merge_chunk_wrapperr   )_concat_wrapper)_merge_chunk_wrapper)_split_partitionleftrightinner)rv   r   dask.dataframe.multirc  rd  re  rZ  rM  rO  r   rI  r   r   r   reverser   rP  r   )r   r   
inter_namer   split_partition_funcr,  merge_chunk_funcr]  r^  r_  other_on
bcast_sider   rH   r0  j_merge_args	inter_keys                     r   r   z#BroadcastJoinLayer._construct_graph  s    		)
		)
 $67$  --STK1; 
 LUU 8<7K7K4
J
H#33d6J6JJVPW
 Axx7"(O	(ZO$ L:&  88w.	 %,,'O )!_O '  '')'A.	$%%	"I ##I.3 '8 $/"=CA]  ` 
r!   )NNNNr   r   )r(   r)   r*   r+   r    r   r~   r   r   r   r@   r   r   r  rZ  r  r  r  r   r_   r`   s   @r   rG  rG    s|    D 6:>

- ! !   2&
% Or!   rG  c                  H     e Zd ZdZ	 	 	 	 d fd	Zed        Zd Zd Z xZ	S )DataFrameIOLayera  DataFrame-based Blockwise Layer with IO

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    columns : str, list or None
        Field name(s) to read in as columns in the output.
    inputs : list or BlockwiseDep
        List of arguments to be passed to ``io_func`` so
        that the materialized task to produce partition ``i``
        will be: ``(<io_func>, inputs[i])``.  Note that each
        element of ``inputs`` is typically a tuple of arguments.
    io_func : callable
        A callable function that takes in a single tuple
        of arguments, and outputs a DataFrame partition.
        Column projection will be supported for functions
        that satisfy the ``DataFrameIOFunction`` protocol.
    label : str (optional)
        String to use as a prefix in the place-holder collection
        name. If nothing is specified (default), "subset-" will
        be used.
    produces_tasks : bool (optional)
        Whether one or more elements of `inputs` is expected to
        contain a nested task. This argument in only used for
        serialization purposes, and will be deprecated in the
        future. Default is False.
    creation_info: dict (optional)
        Dictionary containing the callable function ('func'),
        positional arguments ('args'), and key-word arguments
        ('kwargs') used to produce the dask collection with
        this underlying ``DataFrameIOLayer``.
    annotations: dict (optional)
        Layer annotations to pass through to Blockwise.
    c	           	        || _         || _        || _        || _        || _        || _        || _        || _        t        |t              s@t        t        | j                        D 	
ci c]	  \  }	}
|	f|
 c}
}	| j
                        }n|}t        | j                   |t        t        d                  }t        | A  | j                   d||dfgi |       y c c}
}	w )N)r4   r   rH   )outputoutput_indicesr   indicesr2   r   )rv   _columnsr8  io_funclabelr4   r   creation_inford   r   r   r   r   r   r   rR   r    )r   rv   columnsr8  rz  r{  r4   r|  r   rH   r>  
io_arg_mapr   rS   s                r   r    zDataFrameIOLayer.__init__'  s     	
,&*&,/))24;;)?@)?vq#!s)?@#22J
  J DIIw0B(CD99 #&'# 	 	
 As   'C
c                    | j                   S )z(Current column projection for this layer)ry  r}   s    r   r}  zDataFrameIOLayer.columnsO  s     }}r!   c           	        ddl m} t        |      }| j                  $t	        | j                        j                  |      rt        | j                  |      r| j                  j                  |      }n| j                  }t        | j                  xs ddz   t        | j                  |      z   || j                  || j                  | j                  | j                        }|S | S )zProduce a column projection for this IO layer.
        Given a list of required output columns, this method
        returns the projected layer.
        r   )DataFrameIOFunctionsubset-)r{  r4   r   )dask.dataframe.io.utilsr  r   r}  r  
issupersetrd   rz  project_columnsrt  r{  r   rv   r8  r4   r   )r   r}  r  rz  layers        r   r  z DataFrameIOLayer.project_columnsT  s    
 	@w-<<3t||#4#?#?#H $,,(;<,,66w?,,$'x3.$))W1MMjj#22 ,,E L Kr!   c                v    dj                  | j                  t        | j                        | j                        S )Nz3DataFrameIOLayer<name='{}', n_parts={}, columns={}>)r   rv   r7   r8  r}  r}   s    r   r~   zDataFrameIOLayer.__repr__s  s-    DKKIIs4;;'
 	
r!   )NFNN)
r(   r)   r*   r+   r    r   r}  r  r~   r_   r`   s   @r   rt  rt    s;    "T &
P  >
r!   rt  c                  <    e Zd ZU dZded<   ded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   	 	 	 	 	 	 d$	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d% fdZdddZd&dZd Zd Z	d Z
d Zd Zed        Zd Zd Zd  Zd! Zd" Zd# Z xZS )'DataFrameTreeReductionag  DataFrame Tree-Reduction Layer

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    name_input : str
        Name of the input layer that is being reduced.
    npartitions_input : str
        Number of partitions in the input layer.
    concat_func : callable
        Function used by each tree node to reduce a list of inputs
        into a single output value. This function must accept only
        a list as its first positional argument.
    tree_node_func : callable
        Function used on the output of ``concat_func`` in each tree
        node. This function must accept the output of ``concat_func``
        as its first positional argument.
    finalize_func : callable, optional
        Function used in place of ``tree_node_func`` on the final tree
        node(s) to produce the final output for each split. By default,
        ``tree_node_func`` will be used.
    split_every : int, optional
        This argument specifies the maximum number of input nodes
        to be handled by any one task in the tree. Defaults to 32.
    split_out : int, optional
        This argument specifies the number of output nodes in the
        reduction tree. If ``split_out`` is set to an integer >=1, the
        input tasks must contain data that can be indexed by a ``getitem``
        operation with a key in the range ``[0, split_out)``.
    output_partitions : list, optional
        List of required output partitions. This parameter is used
        internally by Dask for high-level culling.
    tree_node_name : str, optional
        Name to use for intermediate tree-node tasks.
    strrv   r   r   r   r   r,  tree_node_funcCallable | Nonefinalize_funcsplit_every	split_outz	list[int]output_partitionstree_node_namewidthsheightc                   t         |   |       || _        || _        || _        || _        || _        || _        || _        || _	        |	"t        t        | j                  xs d            n|	| _        |
xs d| j                  z   | _        | j                  }|g| _        |dkD  rLt        j                   || j                  z        }| j                  j#                  t%        |             |dkD  rLt'        | j                        | _        y )Nr   rV   z
tree_node-)rR   r    rv   r   r   r,  r  r  r  r  r   r   r  r  r  mathceilr   r   r7   r  )r   rv   r   r   r,  r  r  r  r  r  r  r   r  rS   s                r   r    zDataFrameTreeReduction.__init__  s     	[1	$!2&,*&" !( t~~*+," 	
 -Htyy0H &&gaiIIed&6&667EKKs5z* ai $++&r!   r   splitc               *    | j                   r||fz   S |S r   )r  )r   r  
name_partss      r   	_make_keyz DataFrameTreeReduction._make_key  s     )-zUH$FJFr!   c                    |r| j                   r| j                   }n| j                  }t        j                  || j                  |fS r   )r  r  r   r   r,  )r   
input_keys
final_task
outer_funcs       r   _define_taskz#DataFrameTreeReduction._define_task  s<    $,,++J,,J

J(8(8*EEr!   c                   i }| j                   s|S | j                  }| j                  rd|dz  }| j                   D ]P  }t        | j                        D ]6  }t
        j                  | j                  |f|f|| j                  |||      <   8 R | j                  dk\  rm| j                   D ][  }t        d| j                        D ]>  }t        | j                  |         D ]  }| j                  |dz
     }| j                  |z  }t        || j                  z   |      }	|dk(  r,t        ||	      D cg c]  }| j                  |||       }
}n9t        ||	      D cg c]$  }| j                  | j                  ||dz
  |      & }
}|| j                  dz
  k(  r3|dk(  sJ d| d       | j                  |
d	      || j                  |f<   | j                  |
d
	      || j                  | j                  |||      <   " A ^ |S | j                   D ]9  }| j                  |d|      g}
| j                  |
d	      || j                  |f<   ; |S c c}w c c}w )z%Construct graph for a tree reduction.z-splitr     rV   r   zgroup = z%, not 0 for final tree reduction taskT)r  F)r  r   r  r   r   r   r   r  r  r  r  minr  r  rv   )r   r   name_input_user\   pr   groupp_maxlstartlstopr  s              r   r   z'DataFrameTreeReduction._construct_graph  s    %%J >>h&N++t556A ((!,GC~qBC 7 , ;;!++"1dkk2E!&t{{5'9!: $EAI 6!%!1!1E!9 #FT-=-=$=u E A: */vu)=*)=A !%~q J)= ' * */vu)=	* *>A !%$($7$7EAIQ !/ !" *>	 ' * !DKK!O3 !&
W!)%0UVW *262C2C *t 3D 3CA/ !% 1 1* 1 O	   $$($7$7Q !/ !"? "; 3 ,\ 
	 ++"nn^QanHI
&*&7&7
t&7&TTYYN# , 
G**s   0I
)I
c                d    dj                  | j                  | j                  | j                        S )Nz>DataFrameTreeReduction<name='{}', input_name={}, split_out={}>)r   rv   r   r  r}   s    r   r~   zDataFrameTreeReduction.__repr__+  s(    OVVIIt
 	
r!   c                X    | j                   D ch c]  }| j                  |f c}S c c}w r   )r  rv   )r   r\   s     r   _output_keysz#DataFrameTreeReduction._output_keys0  s*    (,(>(>?(>1A(>???r   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S )N_cached_output_keys)r   r  r  )r   output_keyss     r   r   z&DataFrameTreeReduction.get_output_keys3  s;    4./+++++-K'2D$'''r!   c                    t        | d      S r   r   r}   s    r   r   z&DataFrameTreeReduction.is_materialized;  r   r!   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S r   r   r   s     r   r   zDataFrameTreeReduction._dict>  r   r!   c                     | j                   |   S r   r   r   s     r   r@   z"DataFrameTreeReduction.__getitem__H  r   r!   c                ,    t        | j                        S r   r   r}   s    r   r   zDataFrameTreeReduction.__iter__K  r   r!   c                    t        | j                  dd        xs d| j                  xs dz  }| j                  r%|| j                  t	        | j
                        z  z   S |S r   )sumr  r  r   r7   r  )r   	tree_sizes     r   r   zDataFrameTreeReduction.__len__N  sW    QR).Q4>>3FQG	>>t55D<R<R8SSSSr!   c                    t               }|D ])  }	 |\  }}|| j                  k7  r|j                  |       + |S # t        $ r Y 9w xY w)z;Simple utility to convert keys to output partition indices.r  )r   r   splitsr   r  _splits         r   _keys_to_output_partitionsz1DataFrameTreeReduction._keys_to_output_partitionsV  sY    C #v 		!JJv    r  c                    t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  | j                  || j                  | j                        S )N)r  r  r  r  r  r   )r  rv   r   r   r,  r  r  r  r  r  r   )r   r  s     r   r  zDataFrameTreeReduction._cullc  sf    %IIOO"",,((nn/..((
 	
r!   c                
   | j                   dft        | j                        D ch c]  }| j                  |f c}i}| j	                  |      }|t        | j                        k7  r| j                  |      }||fS | |fS c c}w )z2Cull a DataFrameTreeReduction HighLevelGraph layerr   )rv   r   r   r   r  r  r  r  )r   r   r  rH   r  r  r  s          r   r  zDataFrameTreeReduction.cullr  s     YYN.3D4J4J.K.K!$.K

 !;;DAD$:$: ;;::&78L%%:s   B )N    NNNN)rv   r  r   r  r   r   r,  r   r  r   r  r  r  r   r  z
int | Noner  zlist[int] | Noner  z
str | Noner   zdict[str, Any] | Noner   )r(   r)   r*   r+   rC   r    r  r  r   r~   r  r   r   r   r   r@   r   r   r  r  r  r_   r`   s   @r   r  r  y  s4   #J IO""N  K *. $.2%)-1%'%' %' 	%'
 %' !%' '%' %' %' ,%' #%' +%'N ,- GFHT

@(- ! ! 
r!   r  r4  )<
__future__r   r   r  r   collectionsr   collections.abcr   	itertoolsr   typingr   r	   tlzr   tlz.curriedr
   dask._task_specr   r   	dask.baser   dask.blockwiser   r   r   r   	dask.corer   dask.highlevelgraphr   dask.tokenizer   
dask.utilsr   r   r   r   numpynpr   r.   rE   rK   rb   rf   registerrn   rp   rr   rt   r   r   r   r   r6  rG  rt  r  r,   r!   r   <module>r     sd   "    # $  %   )  U U  % ) = =@ @.K K&E* E3% 3'  &   -(' )' /*) +) .)4 *4y yxBJ
R%2^R Rjo% od| |~t
y t
nEU Er!   