
    CdW                       d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZmZ d dlmZmZ d dlZd dlZd dlZd dlmZ d dlmZ d dlmZmZmZmZ d dlm Z  d d	l!m"Z"m#Z#m$Z$m%Z%m&Z& d d
l'm(Z(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6  ej7        e8          Z9	 	 	 d^d_d!Z:	 	 	 	 	 	 	 d`dad-Z;	 	 	 	 	 	 	 dbdcd7Z<	 	 	 	 ddded<Z=	 	 	 	 	 dfd=Z>	 	 	 	 	 dgd>Z?	 	 	 	 	 dhd?Z@ G d@ dA          ZAdidBZBdC ZC	 djdDZDdE ZEdF ZFdG ZGdH ZHdkdIZIdJ ZJdK ZKdL ZLejM        dM             ZNdN ZOdO ZPdP ZQdQ ZRdR ZSdS ZT	 dldmdWZUdndod[ZVdpd\ZW	 	 dqdrd]ZXdS )s    )annotationsN)CallableMappingSequence)AnyLiteral)is_numeric_dtype)config)computecompute_as_if_collectionis_dask_collectiontokenize)methods)	DataFrameSeries_Framemap_partitionsnew_dd_object)group_split_dispatchhash_object_dispatch)UNKNOWN_CATEGORIES)HighLevelGraph)ShuffleLayerSimpleShuffleLayer)sizeof)Mdigitget_default_shuffle_algorithm      ?    ATdfr   partition_colr   repartitionboolnpartitionsintupsamplefloatpartition_size	ascendingreturntuple[list, list, list, bool]c           
     f	    |r                      t                    ng }                    ||          }                     t          j                  }	                     t          j                  }
	 t          |||	|
          \  }}}	}
nu# t          $ rh}t          j	                  sMt           fd D                       rddj         dfnd\  }}t          d| dj         d	| d
          ||d}~ww xY wt          j        |                                          }|s|r(t!          |          }t          t#          j        ||z            d          }t	          | j                  }|j        }	 t+          j        t+          j        d|dz
  |dz             t+          j        d|dz
  |          |                                                                          }n# t          t2          f$ rV t+          j        d|dz
  |dz                                 t6                    }|j        |                                         }Y naw xY w|j        }t;          |j        d|dz
                                                     |j        |dz
  d                                         z   }|	                    d          }	|
                    d          }
tA          j	        t          j!                  r1j	        }|	                    |          }	|
                    |          }
|	                                
                                s&|
                                
                                rd}n	|	j        }|r|
j        d|dz
           n|
j        dd         "                    d          }|r|	j        dd         n|	j        d|dz
           "                    d          }|	                                |	#                    |                                          k    oU|
                                |
#                    |                                          k    o||k                                     }||	                                |
                                |fS )zO
    Utility function to calculate divisions for calls to `map_partitions`
    )r'   c              3  D   K   | ]}j         |         j         k    V  d S N_name).0cr!   r"   s     6lib/python3.11/site-packages/dask/dataframe/shuffle.py	<genexpr>z'_calculate_divisions.<locals>.<genexpr>;   s1      FFa}*bek9FFFFFF    columnz`.dropna(subset=['z'])`)seriesz`.loc[series[~series.isna()]]`z-Divisions calculation failed for non-numeric z 'z}'.
This is probably due to the presence of nulls, which Dask does not entirely support in the index.
We suggest you try with .N   r   )xxpfpbfillmethodFTdrop)r*   )$r   r   _repartition_quantilesr   minmaxr   	TypeErrorr	   dtypeanynameNotImplementedErrorpdisnaallsummathceilr%   sizenpinterplinspacetolist
ValueErrorastyper&   iloclistuniquefillna
isinstanceCategoricalDtypereset_indexsort_values)r!   r"   r#   r%   r'   r)   r*   sizes	divisionsminsmaxeseobjsuggested_methodempty_dataframe_detectedtotalnindexesrG   	presortedmaxes2mins2s   ``                    r4   _calculate_divisionsrn       s    *5<Bf%%%"E44[84TTI''..D((//E(/	5$(N(N%	5$   
   344 	 FFFFF2FFFFFBH0BHHHIIA "C!
 &? ? ?}GY ? ?+;? ? ?  	 G%(  "wy115577 
. 
E

$)EN$:;;Q??+r~66N	9	+aQa88;q!a%++##%%   fhh	 I
 :& 	9 	9 	9k!QUK!O<<CCCHHG!w/6688III	9
 N!a%(//1122Y^AEGG5L5S5S5U5UU 	 ;;g;&&DLLL((E-%r':;; $#{{5!!U##yy{{ 
EJJLL,,.. 
		I)2F%*Wq1uW%%
122SS T 
 
 #,C1227QU71CPP Q 
 
 KKMMT--	-BBIIKKK '%"3"3i"3"H"H"O"O"Q"QQ'%$$&& 	 dkkmmU\\^^Y>>s-   7B 
DA#C<<D A,G- -A$IIlastbystr | list[str]int | Literal['auto'] | Nonebool | list[bool]na_position"Literal['first'] | Literal['last']sort_function-Callable[[pd.DataFrame], pd.DataFrame] | Nonesort_function_kwargsMapping[str, Any] | Nonec	           	        |dvrt          d          t          |t                    s|g}t          d |D                       rt	          dt          |          z            |||d}
|t          j        }||
                    |           | j	        dk    r | j
        |fi |
S |dk    rd	}t          d
| j	                  }n|| j	        }d}| |d                  }t          |t                    skt          |t                    r7t          |          dk    r$t          |d         t                    r	|d         }nt	          dt          |                     t          | ||||||          \  }}}}t          |          dk    r" |                     d          j
        |fi |
S |r|| j	        k    r | j
        |fi |
S t!          | |d         |||d          }  | j
        |fi |
} | S )z'See DataFrame.sort_values for docstring)firstro   z,na_position must be either 'first' or 'last'c              3  B   K   | ]}t          |t                     V  d S r/   )r\   str)r2   bs     r4   r5   zsort_values.<locals>.<genexpr>   s/      
.
.az!S!!!
.
.
.
.
.
.r6   zuDataframes only support sorting by named columns which must be passed as a string or a list of strings.
You passed %s)rp   r*   rt   Nr:   autoTd   Fr   zHDask currently only supports a single boolean for ascending. You passed    r%   )r*   rt   
duplicates)rV   r\   rY   rH   rJ   r}   r   r_   updater%   r   rE   r$   lenrn   r#   rearrange_by_divisions)r!   rp   r%   r*   rt   r'   r)   rv   rx   kwargssort_kwargsr#   sort_by_colra   rb   rc   rk   s                    r4   r_   r_   w   s    +++GHHHb$ T

.
.2
.
.
... 
!!"gg&
 
 	
 " K
 '/000	~ r >>+>>>f#r~...KRU)Ki&&  y$''		I!##9Q<.. $ "!II%k[^_h[i[ikk   )=
Kk8^Y) )%ItUI 9~~;r~~!~,,;
 
(
 
 	
  ?[BN22 r >>+>>>	

1
 
 
B 
	=	8	8K	8	8BIr6   Findexstr | Seriesshuffle
str | Noner   rB   ra   Sequence | Nonec	                   |dk    rd}
t          d| j                  }n|| j        }d}
t          |t                    s	| |         }n|}|dt	          | ||
|||          \  }}}}|rI|| j        k    r>||d         gz   }t          | |||          }|                    t          j                  S t          | ||f|||d|	S )	z"See _Frame.set_index for docstringr   Tr   NF)rB   ra   )r   rB   r   )
rE   r%   r\   r   rn   set_sorted_indexr   r   
sort_indexset_partition)r!   r   r%   r   r   rB   r'   ra   r)   r   r#   index2rb   rc   rk   results                   r4   	set_indexr      s
    f#r~...KeV$$ E,@[(N-
 -
)	4	  	766b	{*I%b%diPPPF((666
E9&-D' MS  r6       r   
max_branchbool | Nonec           	     &   | j                             dg          }t          |t                    rt	          |          }t          |t
                    s| |         j        }n|j        }t          j        |          	                                r?t          j
        j                            |          r| j                             |          }n_t          |t          j                  r)t          |j        v r| j                             |          }n| j                             ||          }t          |t
                    s:| |                             t"          ||          }	|                     |	          }
n4|                    t"          ||          }	|                     |	|          }
t'          |
d|t)          |          dz
  ||d	          }t          |t
                    s)|                    t*          ||| j        j        
          }n-|                    t.          |j        || j        j        
          }t3          j        |          }t          d |D                       |_        |                    t8          j                  S )a'  Group DataFrame by index

    Sets a new index and partitions data along that index according to
    divisions.  Divisions are often found by computing approximate quantiles.
    The function ``set_index`` will do both of these steps.

    Parameters
    ----------
    df: DataFrame/Series
        Data that we want to re-partition
    index: string or Series
        Column to become the new index
    divisions: list
        Values to form new divisions between partitions
    drop: bool, default True
        Whether to delete columns to be used as the new index
    shuffle: str (optional)
        Either 'disk' for an on-disk shuffle or 'tasks' to use the task
        scheduling framework.  Use 'disk' if you are on a single machine
        and 'tasks' if you are on a distributed cluster.
    max_branch: int (optional)
        If using the task-based shuffle, the amount of splitting each
        partition undergoes.  Increase this for fewer copies but more
        scheduler overhead.

    See Also
    --------
    set_index
    shuffle
    partd
    r   rG   )ra   meta_partitions)r   _indexr   r:   T)r   r%   r   r   ignore_index)
index_namerB   column_dtypec              3  Z   K   | ]&}t          j        |          s|nt          j        V  'd S r/   )rK   rL   rR   nan)r2   is     r4   r5   z set_partition.<locals>.<genexpr>Y  s5      MMa271::9!!26MMMMMMr6   )_meta_constructor_slicedr\   tuplerY   r   rG   rK   rL   rH   apitypesis_integer_dtyper]   r   
categoriesr   set_partitions_preassignrearrange_by_columnr   set_index_post_scalarcolumnsset_index_post_seriesrI   r   rU   ra   r   r   )r!   r   ra   r   rB   r   r   r   rG   
partitionsdf2df3df4s                r4   r   r      su   P 8'',,D)U## $OO	eV$$ 5		wy 
IBFL$A$A%$H$H 
IH00;;		5"-..I%"222 H00;;		H00%0HH	eV$$ 	>Y--)$ . 
 

 iiJi//)))$ * 
 

 iiJui==
	NNQ&  C eV$$ 
  !)	 ! 
 
   !z)	 ! 
 
 y))IMM9MMMMMCMal+++r6   c           	     r   t           j        j                            |          ot	          |           }|pt                      }|dk    rt          |t                    s|rht          |t                    r|g}nt          |          }t          |          }|t          | j
                  z  |k    rt          | ||||||          S t          |t                    s'|rt          |          }|                     |          }n$t          |d          r|                                }|                    t"          |p| j        | j                            dg          d          }	|                     |	          }
| j        j        j        |
j        j        _        t          |
d|||||	          }|d= |S )
a  Group DataFrame by index

    Hash grouping of elements. After this operation all elements that have
    the same index will be in the same partition. Note that this requires
    full dataset read, serialization and shuffle. This is expensive. If
    possible you should avoid shuffles.

    This does not preserve a meaningful index/partitioning scheme. This is not
    deterministic if done in parallel.

    See Also
    --------
    set_index
    set_partition
    shuffle_disk
    tasks)r%   r   r   r   r   to_framer   F)r%   r   transform_divisionsr   r   )r%   r   r   r   r   )rK   r   r   is_list_liker   r   r\   r}   rY   setr   r   r   _select_columns_or_indexhasattrr   r   partitioning_indexr%   r   r   r   r   rI   )r!   r   r   r%   r   r   r   	list_likensetr   r   r   s               r4   r   r   ^  s   2 ))%00R9KE9R9R5RI8688G'z%55 eS!! 	 GEEKKE5zz#bj//!T))&'%)    eV$$ 	! 	 KKE++E22	
	#	# !   %%12>X))1#..!	 &  J ))
)
+
+C8>.CIO
!  C 	MJr6   c                `   | j                             |          }|s|                                }| j                             dg          }| |                             t          ||||          }	|                     |	          }
t          |
d|t          |          dz
  |          }|d= |S )z:Shuffle dataframe so that column separates along divisionsr   )ra   r*   rt   r   r   r   r:   )r   r%   r   )r   r   drop_duplicatesr   r   r   r   r   )r!   r7   ra   r   r   r*   rt   r   r   r   r   r   s               r4   r   r     s     ,,Y77I 0--//	8'',,DF** +  J ))
)
+
+C 	NNQ&  C 	MJr6   c                l   |pt                      }|!|| j        k     r|                     |          } |dk    rt          | |||          S |dk    r8t	          | ||||          }|r |j                            d          |_        |S |dk    rd	d
lm}  || ||          S t          d|z            )Nr   disk)r   r   r   TrA   p2pr   )rearrange_by_column_p2pzUnknown shuffle method %s)
r   r%   r#   rearrange_by_column_diskrearrange_by_column_tasksr   r^   distributed.shuffler   rJ   )	r!   colr%   r   r   r   r   r   r   s	            r4   r   r     s     8688G ;#?#?^^^44&'CgNNNN	G		'Z<
 
 
  	9	--4-88CI
	E		??????&&r3<<<!"="GHHHr6   c                  &    e Zd ZdZddZd Zd ZdS )maybe_buffered_partdz[
    If serialized, will return non-buffered partd. Otherwise returns a buffered partd
    TNc                    |pt          j        dd           | _        || _        t          j        dd           | _        d S )Ntemporary_directoryzdataframe.shuffle.compression)r
   gettempdirbuffercompression)selfr   r   s      r4   __init__zmaybe_buffered_partd.__init__  s=    I&*-BD"I"I!:&EtLLr6   c                B    | j         rt          d| j         ffS t          dfS )NFF)r   r   )r   s    r4   
__reduce__zmaybe_buffered_partd.__reduce__   s)    < 	4(5$,*?@@((33r6   c                .   dd l }t          j        d| j                  }	 | j        rt          |j        | j                  nd }n:# t          $ r-}t          d	                    | j                            |d }~ww xY w|
                    |          }|j        j                            |           |r ||          }| j        r;|                    |                    |                                |                    S |                    |          S )Nr   z.partd)suffixdirzxNot able to import and load {} as compression algorithm.Please check if the library is installed and supported by Partd.)partdtempfilemkdtempr   r   getattr
compressedAttributeErrorImportErrorformatFilefilecleanup_filesappendr   PandasBlocksBufferDict)r   argsr   r   pathpartd_compressionrd   r   s           r4   __call__zmaybe_buffered_partd.__call__  s6   xT\BBB	 #($*:;;; 
  	 	 	SSYSY$T T 
 	 zz$
 ''--- 	+$$T**D; 	,%%ell5::<<&F&FGGG%%d+++s   #A 
A<(A77A<TN)__name__
__module____qualname____doc__r   r   r    r6   r4   r   r     sT         M M M M
4 4 4, , , , ,r6   r   c                n     j         t                     }t          j                    j        }d|z   ft                      fi}d|z   fdt                                                     D             }g }|rt          j	         j
        ||          }	t          j        |	 g          }	t          |          g}
t          t          |	|
          \  }}|i}t          t!          t          |          |                    }n|                                d|z   t$          t'          |          fi}d|z    fdt)                    D             }d	d
z   z  }t+          j	        ||||          }t          j        ||          }	t-          |	 j        |          S )zShuffle using local disk

    See Also
    --------
    rearrange_by_column_tasks:
        Same function, but using tasks rather than partd
        Has a more informative docstring
    Nzzpartd-zshuffle-partition-c                4    i | ]\  }}|ft           |fS r   )shuffle_group_3)r2   r   keyr7   rI   r%   ps      r4   
<dictcomp>z,rearrange_by_column_disk.<locals>.<dictcomp>7  s>       As 
q	OS&+qA  r6   dependencieszbarrier-zshuffle-collect-c                8    i | ]}|ft           |j        fS r   )collectr   )r2   r   barrier_tokenr!   rI   r   s     r4   r   z,rearrange_by_column_disk.<locals>.<dictcomp>M  s9       @Aq	GQ28];  r6   r/   r:   )r%   r   uuiduuid1hexr   	enumerate__dask_keys__r   mergedaskfrom_collectionssortedr   r   dictzipr   barrierrY   rangetoolzr   r   )r!   r7   r%   r   tokenalways_new_tokendsk1dsk2r   graphkeysppvaluesdsk3dsk4ra   layerr   rI   r   s   ```              @@@r4   r   r   #  s    nR--Ez||'	%	%'A$&&()D  "22D       0 0 2 233  D
 L  $RWdD99/e2$OOO6$<< -iEE
F2wCtf--..B !11MGT$ZZ01D %D      EJ;EWEW  D ;?+IKdD$//E+D%lSSSEbh	:::r6   c                    | S )z#
    A task that does nothing.
    r   )r;   cleanup_tokens     r4   _noopr  X  s	     Hr6   c                   |pd}p| j         |k    ryt          |           }d| }p| j         t          || j         | j        | j                  }t          j        ||| g          }t          ||| j        dgdz   z            S | j         }	t          t          j
        t          j        |	          t          j        |          z                      dk    r(t          t          j
        |	dz  z                      n|	fdt          z            D             }
| j         }t          | |	          }t                    D ]`}d| d| }t          ||
||	| j        | j        
  
        }t          j        ||| g          }t          ||| j        | j                  } a|k    rt          |           }d	|z   fd
t          |                                           D             }d|z   }t                    D ]}t"          ||z  f|f|||f<   t          j        ||| g          }t          ||| j        dgdz   z            }n| }d|dz   z  |_        |S )ar  Order divisions of DataFrame so that all values within column(s) align

    This enacts a task-based shuffle.  It contains most of the tricky logic
    around the complex network of tasks.  Typically before this function is
    called a new column, ``"_partitions"`` has been added to the dataframe,
    containing the output partition number of every row.  This function
    produces a new dataframe where every row is in the proper partition.  It
    accomplishes this by splitting each input partition into several pieces,
    and then concatenating pieces from different input partitions into output
    partitions.  If there are enough partitions then it does this work in
    stages to avoid scheduling overhead.

    Lets explain the motivation for this further.  Imagine that we have 1000
    input partitions and 1000 output partitions. In theory we could split each
    input into 1000 pieces, and then move the 1 000 000 resulting pieces
    around, and then concatenate them all into 1000 output groups.  This would
    be fine, but the central scheduling overhead of 1 000 000 tasks would
    become a bottleneck.  Instead we do this in stages so that we split each of
    the 1000 inputs into 30 pieces (we now have 30 000 pieces) move those
    around, concatenate back down to 1000, and then do the same process again.
    This has the same result as the full transfer, but now we've moved data
    twice (expensive) but done so with only 60 000 tasks (cheap).

    Note that the `column` input may correspond to a list of columns (rather
    than just a single column name).  In this case, the `shuffle_group` and
    `shuffle_group_2` functions will use hashing to map each row to an output
    partition. This approach may require the same rows to be hased multiple
    times, but avoids the need to assign a new "_partitions" column.

    Parameters
    ----------
    df: dask.dataframe.DataFrame
    column: str or list
        A column name on which we want to split, commonly ``"_partitions"``
        which is assigned by functions upstream.  This could also be a list of
        columns (in which case shuffle_group will create a hash array/column).
    max_branch: int
        The maximum number of splits per input partition.  Defaults to 32.
        If there are more partitions than this then the shuffling will occur in
        stages in order to avoid creating npartitions**2 tasks
        Increasing this number increases scheduling overhead but decreases the
        number of full-dataset transfers that we have to make.
    npartitions: Optional[int]
        The desired number of output partitions

    Returns
    -------
    df3: dask.dataframe.DataFrame

    See also
    --------
    rearrange_by_column_disk: same operation, but uses partd
    rearrange_by_column: parent function that calls this or rearrange_by_column_disk
    shuffle_group: does the actual splitting per-partition
    r   zsimple-shuffle-r   Nr:   c                d    g | ]+t          fd t                    D                       ,S )c              3  :   K   | ]}t          |          V  d S r/   )r   )r2   jr   ks     r4   r5   z7rearrange_by_column_tasks.<locals>.<listcomp>.<genexpr>  s-      ::qE!QNN::::::r6   )r   r  )r2   r   r!  stagess    @r4   
<listcomp>z-rearrange_by_column_tasks.<locals>.<listcomp>  s@    UUUqe:::::E&MM:::::UUUr6   zshuffle--zrepartition-group-c                4    i | ]\  }}|ft           |fS r   )shuffle_group_2)r2   r   r!  r7   r   r%   repartition_group_tokens      r4   r   z-rearrange_by_column_tasks.<locals>.<dictcomp>  sH     	
 	
 	
 1 %a(+	
 	
 	
r6   zrepartition-get-r/   )r%   r   r   r1   r   r   r  r   r&   rO   rP   logr  r   ra   r  r  shuffle_group_get)r!   r7   r   r%   r   r  shuffle_nameshuffle_layerr  ri   inputsnpartitions_origstage
stage_namestage_layerdskrepartition_get_namer   graph2r   r!  r'  r"  s    ` ``               @@@r4   r   r   _  s6   v !rJ%r~*44 V[11000!3R^*NHH
 
 /-rd
 
 
 UL"(TFkTUo<VWWW
A48A;;*)=)==>>??Fzz	!F
+,,--UUUUUE!V)DTDTUUUF~RA..Ev F F/////
"HH
 
 /2$
 
 
 5*bhEE;2B#B#B[))"6">	
 	
 	
 	
 	
 	
 	
 """2"2"4"455	
 	
 	
  2E9{## 	 	A!(!.>*>?.C%q)**  0 #RD
 
 
 ("(TFkAo4N
 
 #3a#78Jr6   c                D    t          | d          t          |          z  S )a~  
    Computes a deterministic index mapping each record to a partition.

    Identical rows are mapped to the same partition.

    Parameters
    ----------
    df : DataFrame/Series/Index
    npartitions : int
        The number of partitions to group into.

    Returns
    -------
    partitions : ndarray
        An array of int64 values mapping each record to a partition.
    Fr   )r   r&   )r!   r%   s     r4   r   r     s$    "  %0003{3C3CCCr6   c                $    t          |            dS )Nr   )rY   )r   s    r4   r  r    s    JJJ1r6   c                    ddl }t          | |j                  r| j         }nd}t          ||j                  r|j        }nd}|rt          j        |d           dS dS )z
    Cleanup the files in a partd.File dataset.

    Parameters
    ----------
    p : partd.Interface
        File or Encode wrapping a file should be OK.
    keys: List
        Just for scheduling purposes, not actually used.
    r   NT)ignore_errors)r   r\   Encoder   r   shutilrmtree)r   r  r   
maybe_filer   s        r4   cleanup_partd_filesr=    s~     LLL!U\"" W


*ej))  0d$//////0 0r6   c                    t          |           5  |                     |          }t          |          dk    r|n|cddd           S # 1 swxY w Y   dS )z/Collect partitions from partd, yield dataframesr   N)ensure_cleanup_on_exceptionr   r   )r   partr   r   ress        r4   r   r   -  s    	$Q	'	' - -eeDkk#hhllss- - - - - - - - - - - - - - - - - -s   ,A		AAc                   	 |r|                     | d          dz
  }n*t          |          |                     | d          z
  dz
  }n# t          t          f$ r t	          j        t          |           d          }|                                 }||                                         }|r$|                     | |         d          dz
  ||<   n3t          |          |                     | |         d          z
  dz
  ||<   Y nw xY w|rt          |          dz
  nd||dk     |t          |          dz
  k    z  <   |dk    rt          |          dz
  nd||                                 j        <   |S )	Nright)sider:   int32r   r   r   ro   )	searchsortedr   rF   rV   rR   emptynotnarL   r  )sra   r*   rt   r   not_nulldivisions_notnas          r4   r   r   4  s    	V"///@@1DJJY)*@*@*@*Q*QQTUUJz"   Xc!ffG444
7799#IOO$5$56 		,,Qx[w,GG!K x  
 I!..q{.IIJ x   (.IQ 
Q:Y!1C#CDE 9Dv8M8M#i..1"4"4STJqvvxxs   AA
 
B;DDc                   t          |           si | fS t          |t                    r|g}|r8|d         dk    r,| |d                                      t          j                  }nHt          |r| |         n| d          t          |          z                      t          j                  }|                                dz   }t          | |||          }|| j
        d d         fS )Nr   r   Fr5  r:   r   )r   r\   r}   rW   rR   rE  r   r&   rE   r   rX   )r!   colsr   npartsindri   result2s          r4   r&  r&  P  s    r77 2v$ v Q=((ak  ** !T!9DrGGG#f++U
&

 	 			AA"2sALIIIGBGBQBKr6   c                (    | \  }}||v r||         S |S r/   r   )g_headr   gheads       r4   r)  r)  c  s"    GAtAvvtr6   c                p   t          |t                    r|g}|r|d         dk    r| |d                  }n5t          |r| |         n| d          }|r||k    r|t          |          z  }t	          j        |dz            }||z                      |d          ||z  z  |z  }t          | |||          S )a  Splits dataframe into groups

    The group is determined by their final partition, and which stage we are in
    in the shuffle

    Parameters
    ----------
    df: DataFrame
    cols: str or list
        Column name(s) on which to split the dataframe. If ``cols`` is not
        "_partitions", hashing will be used to determine target partition
    stage: int
        We shuffle dataframes with many partitions we in a few stages to avoid
        a quadratic number of tasks.  This number corresponds to which stage
        we're in, starting from zero up to some small integer
    k: int
        Desired number of splits from this dataframe
    npartition: int
        Total number of output partitions for the full dataframe
    nfinal: int
        Total number of output partitions after repartitioning

    Returns
    -------
    out: Dict[int, DataFrame]
        A dictionary mapping integers in {0..k} to dataframes such that the
        hash values of ``df[col]`` are well partitioned.
    r   r   Fr5  r   )copyr   )r\   r}   r   r&   rR   min_scalar_typerW   r   )	r!   rM  r.  r!  r%   r   nfinalrO  typs	            r4   shuffle_grouprZ  k  s    : $ v $Q=((ak"t#;2d885III 	$f++F#C

[1_
-
-C 
$
$Su
$
5
5E
AA
ECCFFFFr6   c              #     K   	 dV  dS # t           $ rB 	 |                                  n*# t           $ r t                              d           Y nw xY w w xY w)zEnsure a partd.File is cleaned up.

    We have several tasks referring to a `partd.File` instance. We want to
    ensure that the file is cleaned up if and only if there's an exception
    in the tasks using the `partd.File`.
    Nz1ignoring exception in ensure_cleanup_on_exception)	ExceptionrB   logger	exception)r   s    r4   r?  r?    s      	   	RFFHHHH 	R 	R 	RPQQQQQ	Rs)   
 
A*A$AAAAc                    t          |          5  |                     |          fdj        D             }|                    |d           d d d            d S # 1 swxY w Y   d S )Nc                <    i | ]}|                     |          S r   )	get_group)r2   r   rS  s     r4   r   z#shuffle_group_3.<locals>.<dictcomp>  s%    1111QA111r6   T)fsync)r?  groupbygroupsr   )r!   r   r%   r   drS  s        @r4   r   r     s    	$Q	'	'    JJsOO1111111	$                                   s   A AA"%A"c                    |                      dd                              ||          }|j                            |          |_        |S )Nr   r:   axisrA   )rB   r   r   rW   r!   r   rB   r   r   s        r4   r   r     sF    
''-a'
(
(
2
2:D
2
I
IC+$$\22CKJr6   c                    |                      dd                              dd          }||j        _        |j                            |          |_        |S )Nr   r:   rg  r   TrA   )rB   r   r   rI   r   rW   ri  s        r4   r   r     sO    
''-a'
(
(
2
28$
2
G
GCCIN+$$\22CKJr6   c                B    || j         v r|                     |          n| S r/   )r   rB   r!   r   s     r4   drop_overlaprm    s"    "bh..2775>>>B6r6   c                X    || j         v r| j        |g         n|                                 S r/   )r   loc_constructorrl  s     r4   get_overlaprq    s*    #rx//265'??R__5F5FFr6   c                    dt           |          z   d t          |          D             }t          |          dk    r>d}df j        dfi}t	          j        | g          }t          | j        |          S  fdt          |          D             }t          |	                                          }t                    d         fz   }fdt          d	t                              D             }	g }
|	D ]}|
                    t          ||d	z
           ||         f           t          ||d	z
  f         ||         f||d	z
  f<   ||         ||d	z            k    r|d	z   |	v rn|
                    ||                    t          j        |
f||f<   g }
t	          j        | g          }t          | j        |          S )
a  Ensures that the upper bound on each partition of ddf (except the last) is exclusive

    This is accomplished by first removing empty partitions, then altering existing
    partitions as needed to include all the values for a particular index value in
    one partition.
    zfix-overlap-c                $    g | ]\  }}|d k    |S r   r   )r2   r   lengths      r4   r#  zfix_overlap.<locals>.<listcomp>  s!    EEEF11r6   r   )NNr   c                .    i | ]\  }}|fj         |fS r   r0   )r2   r   divddfrI   s      r4   r   zfix_overlap.<locals>.<dictcomp>  s*    
L
L
L61cD!9sy#&
L
L
Lr6   r   c                >    g | ]}|         |d z
           k    |S )r:   r   r2   r   rc   rb   s     r4   r#  zfix_overlap.<locals>.<listcomp>  s/    IIIQaE!a%L1H1Hq1H1H1Hr6   r:   )r   r  r   r1   r   r  r   r   rY   r  r   r  r   rq  rm  r   concat)rx  rb   rc   lensnon_emptiesra   r1  r  ddf_keysoverlapframesr   rI   s   ```         @r4   fix_overlapr    s    HS$t<<<DEEiooEEEK
;1 	ay39a.)/cNNNUD#)Y??? M
L
L
L
LY{5K5K
L
L
LCCJJLL!!HduRyl*IIIIII%3t99--IIIGF   	{HQUOYq\BCCC +Cq1u,>	!MT1q5M Q<9QU+++A0@0@hqk"""!.&1T1I+D#SEJJJEci;;;r6   r7   allow_overlaptuple[list, list, list[int]]c                   |                      t          j        |           }|                      t          j        |           }|                      t          |           }t          |||fi |\  }}}|                    d                                          }|                    d                                          }d t          ||          D             }d t          ||          D             }t          |          |k    st          |          |k    rCt          d| j        pd d| j        pd d	t          t          |||                               |szt          d
 t          |dd         |dd                   D                       rCt          j        d| j        pd d	t          t          |||                     t                      t#          j        |          }|s|||fS |||fS )zFor a given column, compute the min, max, and len of each partition.

    And make sure that the partitions are sorted relative to each other.
    NOTE: this does not guarantee that every partition is internally sorted.
    )r   r>   r?   c                $    g | ]\  }}|d k    |S rt  r   r2   mru  s      r4   r#  z,_compute_partition_stats.<locals>.<listcomp>
  s!    HHHIAvFaKKaKKKr6   c                $    g | ]\  }}|d k    |S rt  r   r  s      r4   r#  z,_compute_partition_stats.<locals>.<listcomp>  s!    JJJYQfkkqkkkr6   z'Partitions are not sorted ascending by z	the indexz.In your dataset the (min, max, len) values of z for each partition are : c              3  (   K   | ]\  }}||k    V  d S r/   r   )r2   ar~   s      r4   r5   z+_compute_partition_stats.<locals>.<genexpr>  s;       ! !1aQ! ! ! ! ! !r6   r:   Nr   zPartitions have overlapping values, so divisions are non-unique.Use `set_index(sorted=True)` with no `divisions` to allow dask to fix the overlap. In your dataset the (min, max, len) values of )r   r   rD   rE   r   r   r[   rU   r  r	  rV   rI   rY   rH   warningswarnUserWarningr   )r7   r  r   rb   rc   r|  non_empty_minsnon_empty_maxess           r4   _compute_partition_statsr    sP      V 44D!!!%f!55E  6 22DeT<<V<<D%;;g;&&--//DLLL((//11EHHT4HHHNJJ#eT*:*:JJJO~.00/""o55Rfk6P[RRGV[=WK G G(,Sud-C-C(D(DG G
 
 	

  	
S ! !~abb1?3B33GHH! ! !   	
 	G=C[=WKG G )-Sud-C-C(D(DG G 	
 	
 	
 >$D 7eT""66r6   r   
Any | Noner   c                |    || j         n| |         }t          |fddi|\  }}}t          |          |d         fz   S )Nr  Fr   )r   r  r   )r!   r   r   r7   rb   rc   _s          r4   compute_divisionsr  &  sL    RXX"S'F-fTTETVTTND%;;%)%%r6   c           	     X   t          | j        fddi|\  }t                    t          | j                  dz
  k    rWt	                    d         fz   | _        t          fdt          dt                              D                       s| S t          | |          S )Nr  Tr:   r   c              3  B   K   | ]}|         |d z
           k    V  dS )r:   Nr   rz  s     r4   r5   z,compute_and_set_divisions.<locals>.<genexpr>1  s4      HHq47eAEl*HHHHHHr6   )	r  r   r   ra   r   
_divisionsrH   r  r  )r!   r   r|  rc   rb   s      @@r4   compute_and_set_divisionsr  -  s    0XXXQWXXD%
4yyC%%)))duRyl2HHHHHE!SYY4G4GHHHHH 	Ir4---r6   c           	        t          |t                    r"| j                            |j        |          }n| j                            ||          }t	          t
          j        | |||dd          }|st          |fi |S t          |          t          | j                  k    rd}t          |          t          |          |_        |S )NrA   F)rB   r   align_dataframesr   ae  When doing `df.set_index(col, sorted=True, divisions=...)`, divisions indicates known splits in the index column. In this case divisions must be the same length as the existing divisions in `df`

If the intent is to repartition into new divisions after setting the index, you probably want:

`df.set_index(col, sorted=True).repartition(divisions=divisions)`)r\   r   r   r   r   r   r  r   ra   rV   r   )r!   r   rB   ra   r   r   r   msgs           r4   r   r   7  s     %   4x!!%+D!99x!!%d!33	
!  F  (::6:::	Y3r|,,	,	,P 	 ooY''FMr6   )r   r    T)r!   r   r"   r   r#   r$   r%   r&   r'   r(   r)   r(   r*   r$   r+   r,   )NTro   r   r    NN)r!   r   rp   rq   r%   rr   r*   rs   rt   ru   r'   r(   r)   r(   rv   rw   rx   ry   r+   r   )NNFTr   Nr    )r!   r   r   r   r%   rr   r   r   r   r$   rB   r$   r'   r(   ra   r   r)   r(   r+   r   )r   TNN)r!   r   r   r   ra   r   r   r&   rB   r$   r   r   r   r   r+   r   )NNr   FN)NNTro   T)NNNNF)NF)r   NF)Tro   r   )r7   r   r  r$   r+   r  r/   )r!   r   r   r  r+   r   )r!   r   r+   r   r   )
r!   r   r   r   rB   r$   ra   r   r+   r   )Y
__future__r   
contextlibloggingrO   r:  r   r  r  collections.abcr   r   r   typingr   r   numpyrR   pandasrK   tlzr  pandas.api.typesr	   r  r
   	dask.baser   r   r   r   dask.dataframer   dask.dataframe.corer   r   r   r   r   dask.dataframe.dispatchr   r   dask.dataframe.utilsr   dask.highlevelgraphr   dask.layersr   r   dask.sizeofr   
dask.utilsr   r   r   	getLoggerr   r]  rn   r_   r   r   r   r   r   r   r   r  r   r   r  r=  r   r   r&  r)  rZ  contextmanagerr?  r   r   r   rm  rq  r  r  r  r  r   r   r6   r4   <module>r     s   " " " " " "           7 7 7 7 7 7 7 7 7 7                     - - - - - -       U U U U U U U U U U U U " " " " " " X X X X X X X X X X X X X X N N N N N N N N 3 3 3 3 3 3 . . . . . . 8 8 8 8 8 8 8 8       > > > > > > > > > >		8	$	$ !T? T? T? T? T?t 15#'6<!CG59R R R R Rp 15!%!& & & & &Z f, f, f, f, f,X L L L Lf # # # #R  I  I  I  IF*, *, *, *, *, *, *, *,Z2; 2; 2; 2;j   ?DR R R RtD D D(  
0 0 06- - -   8     &  +G +G +G\   &         7 7 7G G G0< 0< 0<h +0'7 '7 '7 '7 '7T& & & & &. . . . !%	% % % % % % %r6   