
    +gd                        d Z ddlZddlZddlZddlZddlmZmZmZm	Z	m
Z
mZmZ ddlZddlZddlZddlmZ ddlmZ ddlmZmZmZ ddlmZmZmZmZm Z m!Z!m"Z"m#Z# ddl$m%Z% dd	l&m'Z' dd
l(m)Z)m*Z* ddl+m,Z,m-Z-m.Z.m/Z/m0Z0 ddl1m2Z2 ddl3m4Z4 ddl5m6Z6m7Z7  e2j8        e9          Z:e;Z< G d de=          Z> G d d          Z? G d de?          Z@ G d d          ZA G d deA          ZB G d d          ZCde	eD         fdZEde	eD         fdZFdS )z$To write records into Parquet files.    N)AnyDictIterableListOptionalTupleUnion   )config)FeaturesImageValue)FeatureType_ArrayXDExtensionTypecast_to_python_objectsgenerate_from_arrow_typeget_nested_type%list_of_np_array_to_pyarrow_listarraynumpy_to_pyarrow_listarrayto_pyarrow_listarray)is_remote_filesystem)DatasetInfo)DuplicatedKeysError	KeyHasher)
array_castarray_concatcast_array_to_featureembed_table_storage
table_cast)logging)hash_url_to_filename)asdictfirst_non_null_valuec                       e Zd ZdS )SchemaInferenceErrorN)__name__
__module____qualname__     5lib/python3.11/site-packages/datasets/arrow_writer.pyr%   r%   4   s        Dr*   r%   c            
           e Zd ZdZ	 	 	 ddedee         dee         dee         fdZdefd	Ze	dede
eee         f         fd
            Zddeej                 fdZdS )TypedSequencea  
    This data container generalizes the typing when instantiating pyarrow arrays, tables or batches.

    More specifically it adds several features:
    - Support extension types like ``datasets.features.Array2DExtensionType``:
        By default pyarrow arrays don't return extension arrays. One has to call
        ``pa.ExtensionArray.from_storage(type, pa.array(data, type.storage_type))``
        in order to get an extension array.
    - Support for ``try_type`` parameter that can be used instead of ``type``:
        When an array is transformed, we like to keep the same type as before if possible.
        For example when calling :func:`datasets.Dataset.map`, we don't want to change the type
        of each column by default.
    - Better error message when a pyarrow array overflows.

    Example::

        from datasets.features import Array2D, Array2DExtensionType, Value
        from datasets.arrow_writer import TypedSequence
        import pyarrow as pa

        arr = pa.array(TypedSequence([1, 2, 3], type=Value("int32")))
        assert arr.type == pa.int32()

        arr = pa.array(TypedSequence([1, 2, 3], try_type=Value("int32")))
        assert arr.type == pa.int32()

        arr = pa.array(TypedSequence(["foo", "bar"], try_type=Value("int32")))
        assert arr.type == pa.string()

        arr = pa.array(TypedSequence([[[1, 2, 3]]], type=Array2D((1, 3), "int64")))
        assert arr.type == Array2DExtensionType((1, 3), "int64")

        table = pa.Table.from_pydict({
            "image": TypedSequence([[[1, 2, 3]]], type=Array2D((1, 3), "int64"))
        })
        assert table["image"].type == Array2DExtensionType((1, 3), "int64")

    Ndatatypetry_typeoptimized_int_typec                     ||t          d          || _        || _        || _        || _        | j        d u| _        |d uo|d u o|d u | _        d | _        d S )Nz)You cannot specify both type and try_type)
ValueErrorr.   r/   r0   r1   trying_typetrying_int_optimization_inferred_type)selfr.   r/   r0   r1   s        r+   __init__zTypedSequence.__init__`   s{      4HIII		 "4=4'9'E'k$RV,'k[cgk[k$"r*   returnc                 t    | j         +t          t          j        |           j                  | _         | j         S )a  Return the inferred feature type.
        This is done by converting the sequence to an Arrow array, and getting the corresponding
        feature type.

        Since building the Arrow array can be expensive, the value of the inferred type is cached
        as soon as pa.array is called on the typed sequence.

        Returns:
            FeatureType: inferred feature type of the sequence.
        )r6   r   paarrayr/   r7   s    r+   get_inferred_typezTypedSequence.get_inferred_typeu   s1     &":28D>>;N"O"OD""r*   c                     t           j        rXdt          j        v rJddl}t          |           \  }}t          ||j        j                  rd | D             t                      fS | dfS )a  Implement type inference for custom objects like PIL.Image.Image -> Image type.

        This function is only used for custom python objects that can't be direclty passed to build
        an Arrow array. In such cases is infers the feature type to use, and it encodes the data so
        that they can be passed to an Arrow array.

        Args:
            data (Iterable): array of data to infer the type, e.g. a list of PIL images.

        Returns:
            Tuple[Iterable, Optional[FeatureType]]: a tuple with:
                - the (possibly encoded) array, if the inferred feature type requires encoding
                - the inferred feature type if the array is made of supported custom objects like
                    PIL images, else None.
        PILr   Nc                 X    g | ]'}|!t                                          |          nd (S N)r   encode_example.0values     r+   
<listcomp>z?TypedSequence._infer_custom_type_and_encode.<locals>.<listcomp>   s6    gggY^9J..u555PTgggr*   )r   PIL_AVAILABLEsysmodules	PIL.Imager#   
isinstancer   )r.   r@   non_null_idxnon_null_values       r+   _infer_custom_type_and_encodez+TypedSequence._infer_custom_type_and_encode   s|    "  	qES[$8$8+?+E+E(L..#)/:: qggbfggginipipppTzr*   c           	         |t          d          ~| j        }| j        $| j        |                     |          \  }| _        | j        | j        r| j        n| j        }n| j        }|t          |          nd}| j        t          | j                  nd}d}	 t          |t                    r0t          ||          }t          j                            ||          S t          |t          j                  rt#          |          }nyt          |t$                    r?|r=t          t'          |          d         t          j                  rt)          |          }n%d}t          j        t-          |d                    }| j        r7t          j                            |j                  r|                    |          }nt          j                            |j                  rt          j                            |j        j                  r#t;          |t          j        |                    }nt          j                            |j        j                  rbt          j                            |j        j        j                  r4t;          |t          j        t          j        |                              }n|t?          ||| j                   }|S # t@          t          j!        j"        t          j!        j#        f$ r}| j        s t          |t          j!        j#                  r | j        r	 t          |t          j                  rt#          |          cY d}~S t          |t$                    r/|r-tI          d |D                       rt)          |          cY d}~S d}t          j        t-          |d                    cY d}~S # t          j!        j"        $ r}d	tK          |          v r$tM          d
tO          |           d| d          d| j        rfdtK          |          v rUt          j(        |)                                          j*        }	tV          ,                    d|	 d           |cY d}~cY d}~S |rUdtK          |          v rDt          j        t-          |dd                    }|t?          ||d          }|cY d}~cY d}~S  d}~ww xY wd	tK          |          v r$tM          d
tO          |           d| d          d| j        radtK          |          v rPt          j(        |)                                          j*        }	tV          ,                    d|	 d           |cY d}~S |rPdtK          |          v r?t          j        t-          |dd                    }|t?          ||d          }|cY d}~S  d}~ww xY w)z=This function is called when calling pa.array(typed_sequence)NzMTypedSequence is supposed to be used with pa.array(typed_sequence, type=None)Fr
   T)only_1d_for_numpy)allow_number_to_strc              3   J   K   | ]}t          |t          j                  V  d S rB   )rL   npndarrayrD   s     r+   	<genexpr>z0TypedSequence.__arrow_array__.<locals>.<genexpr>   s1      @q@qchESUS]A^A^@q@q@q@q@q@qr*   overflowz There was an overflow with type zE. Try to reduce writer_batch_size to have batches smaller than 2GB.
()znot in rangezFailed to cast a sequence to z. Falling back to int64.zCould not convert)rQ   optimize_list_casting)-r3   r.   r/   r0   rO   r6   r4   r   r1   rL   r   r   r;   ExtensionArrayfrom_storagerT   rU   r   listr#   r   r<   r   r5   typesis_int64castis_list
value_typer   list_r   	TypeErrorlibArrowInvalidArrowNotImplementedErroranystrOverflowErrortype_dtypeto_pandas_dtypenameloggerinfo)
r7   r/   r.   pa_typeoptimized_int_pa_typetrying_cast_to_python_objectsstorageouteoptimized_int_pa_type_strs
             r+   __arrow_array__zTypedSequence.__arrow_array__   su    lmmmy9!6(,(J(J4(P(P%D$%&$($4C4==$)DD&D+/+;/$'''8<8O8[OD3444ae 	 ).%R	'#899 H.tW==(55gwGGG $
++ U066D$'' UD UZ@TUY@Z@Z[\@]_a_i5j5j U;DAA04-h5ddSSSTT+ a8$$SX.. Y((#899CCX%%ch// Yx(()<== Y(bh7L.M.MNN))#(*=>> Y28CTCTUXU]UhUsCtCt Y(bhrx@U7V7V.W.WXX! ,CtO_K_```JFF+
 4	 4	 4	 # 
1bf6U(V(V  +!$
33 ^9$????????#D$// ^D ^S@q@qlp@q@q@q=q=q ^DTJJJJJJJJ8<5!x(>tW[(\(\(\]]]]]]]]v*   !SVV+++ WuT{{  W  W  ST  W  W  W #$ 5 .CPQFF:R:R46H=R=b=b=d=d4e4e4j1o<Uooo    #










6 ;NRUVWRXRX;X;X h244glmmm   +"7TW["\"\"\C"










'( s1vv%%# OuT{{  O  O  KL  O  O  O  - 
.CFF2J2J,.H5J5Z5Z5\5\,],],b)o<Uoooppp





. 3F#a&&3P3Ph5ddjopppqq#/TtTTTC





i4	s   AK !G=K ,W"/W;(O#W")>O'W"-$OW"S*+BS%S*WW"AS%S*WW"$S%%S**BWW"
AWW"WW")NNNrB   )r&   r'   r(   __doc__r   r   r   r8   r>   staticmethodr   rO   r;   DataTyperw   r)   r*   r+   r-   r-   8   s        % %T '+*.48# ## {## ;'	#
 %[1# # # #*#; # # # # H xR]I^?^9_    \0e eHR[$9 e e e e e er*   r-   c            
       j     e Zd Z	 	 	 	 ddee         dee         dee         dee         f fdZ xZS )OptimizedTypedSequenceNr/   r0   colr1   c                     t          d          t          d          t          d          t          d          d}|||                    |d           }t                                          ||||           d S )Nint8int32)attention_maskspecial_tokens_mask	input_idstoken_type_ids)r/   r0   r1   )r   getsuperr8   )r7   r.   r/   r0   r}   r1   optimized_int_type_by_col	__class__s          r+   r8   zOptimizedTypedSequence.__init__  s     $Fmm#(==w# 	%
 %
! <H,!:!>!>sD!I!ID8Pbcccccr*   )NNNN)r&   r'   r(   r   r   rh   r8   __classcell__)r   s   @r+   r|   r|     s         '+*.!48d d {#d ;'	d
 c]d %[1d d d d d d d d d dr*   r|   c                   r   e Zd ZdZej        Z	 	 	 	 	 	 	 	 	 	 	 	 	 	 d,deej                 dee	         dee
         d	eej                 d
ee
         dee         dee
         dee         dededede
dedee         fdZd Zd Zd Zd Zdej        fdZed             Zed-ded
ee
         dee
e
f         fd            Zd Zd  Z	 	 d.d!ee
ef         d"eee
eef                  dee         fd#Z d$ Z!d-d%ej"        dee         fd&Z#	 d-d'ee
e$f         dee         fd(Z%d-d)ej"        dee         fd*Z&d/d+Z'dS )0ArrowWriterz,Shuffles and writes Examples to Arrow files.NFTexamplesschemafeaturespathstreamfingerprintwriter_batch_size	hash_saltcheck_duplicatesdisable_nullableupdate_featureswith_metadataunitembed_local_filesstorage_optionsc                 ~   ||t          d          ||| _        d | _        n6|&|| _        t          j        | j                  | _        nd | _        d | _        |t          |          | _        nt          d          | _        || _        |	| _        |t          j
        ||          }|d         | _        t          | j                  s|d         d         n%| j                            |d         d                   | _        | j                            |d         d         d          | _        d| _        nd | _        d | _        || _        d| _        || _        |	| _        |pt(          j        | _        |
| _        || _        || _        || _        d| _        d| _        g | _        g | _        d | _        g | _         d S )	Nz1At least one of path and stream must be provided. )r   r      wbTF)!r3   	_features_schemar   from_arrow_schemar   _hasher_check_duplicates_disable_nullablefsspecget_fs_token_paths_fsr   unstrip_protocol_pathopenr   _closable_streamr   r   r   DEFAULT_MAX_BATCH_SIZEr   r   r   r   r   _num_examples
_num_bytescurrent_examplescurrent_rows	pa_writerhkey_record)r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   fs_token_pathss                   r+   r8   zArrowWriter.__init__   s   " <FNPQQQ%DNDLL&,DL%7EEDNN!DNDL $Y//DLL$R==DL!1!1>#6t_]]]N2@2CDH ,DH55Eq!!$$X..~a/@/CDD J
 (--q(9!(<dCCDK$(D!!DHDJ DK$)D!& 0!2!Sf6S.*	!2BD,.?Cr*   c                 d    | j         t          | j                  z   t          | j                  z   S )z/Return the number of writed and staged examples)r   lenr   r   r=   s    r+   __len__zArrowWriter.__len__e  s+    !C(=$>$>>TEVAWAWWWr*   c                     | S rB   r)   r=   s    r+   	__enter__zArrowWriter.__enter__i  s    r*   c                 .    |                                   d S rB   )close)r7   exc_typeexc_valexc_tbs       r+   __exit__zArrowWriter.__exit__l  s    

r*   c                     | j         r+	 | j                                          n# t          $ r Y nw xY w| j        r'| j        j        s| j                                         d S d S d S rB   )r   r   	Exceptionr   r   closedr=   s    r+   r   zArrowWriter.closeo  s    > 	$$&&&&     	 ); 	 K	  	  	  	 s   # 
00inferred_schemac                 4   | j         }t          j        |          }| j        X| j        rPd | j        j        D             }|j        D ])}|j        }||v r|||         k    r| j        |         ||<   *|| _        |}n	|| _        |}| j        rt          j         d |D                       }| j	        rA|	                    | 
                    t          | j                  | j                            }|| _        |                     | j        |          | _        d S )Nc                     i | ]
}|j         |S r)   )rm   rE   fields     r+   
<dictcomp>z-ArrowWriter._build_writer.<locals>.<dictcomp>~  s    MMM%*eMMMr*   c              3   X   K   | ]%}t          j        |j        |j        d           V  &dS F)nullableNr;   r   rm   r/   r   s     r+   rV   z,ArrowWriter._build_writer.<locals>.<genexpr>  s8      ddTYrx
EJOOOddddddr*   )r   )r   r   r   r   r   r/   rm   r   r;   r   _build_metadatar   r   r   _WRITER_CLASSr   r   )r7   r   r   inferred_featuresfieldsinferred_fieldrm   s          r+   _build_writerzArrowWriter._build_writery  s6   $6GG>%# 4MM9LMMM&7&< K KN).Dv~~)VD\996:nT6J-d3!2$3.DN /F  	eYdd]cdddddF 	x))$*>*>{TXTb?c?c?ceieu*v*vwwF++DK@@r*   c                     | j         | j         n&| j        t          j        | j        j                  nd }| j        r |t          j        d |D                       }||ng S )Nc              3   X   K   | ]%}t          j        |j        |j        d           V  &dS r   r   r   s     r+   rV   z%ArrowWriter.schema.<locals>.<genexpr>  s8      ffUZUZ% P P Pffffffr*   )r   r   r;   r   r/   r   )r7   r   s     r+   r   zArrowWriter.schema  sz     |' LL48N4N")DN/000TX 	
 ! 	gg&9iff^efffffG!-ww25r*   ro   r9   c                     dg}t          |           i }fd|D             |d<   |||d<   dt          j        |          iS )Nr   c                 "    i | ]}||         S r)   r)   )rE   keyinfo_as_dicts     r+   r   z/ArrowWriter._build_metadata.<locals>.<dictcomp>  s     HHHsCc!2HHHr*   ro   r   huggingface)r"   jsondumps)ro   r   	info_keysmetadatar   s       @r+   r   zArrowWriter._build_metadata  s]    L	d||HHHHiHHH"&1H]#tz(3344r*   c                      j         sdS  j        rJ fd j        j        D              fd j         d         d                                         D             z   n$ j         d         d                                         }i }|D ]^t	          fd j         D                       r&fd j         D             }t          |          |<   Hfd j         D             |<   _                     |           g  _         dS )	ziWrite stored examples from the write-pool of examples. It makes a table out of the examples and write it.Nc                 >    g | ]}|j         d          d          v |S r   )r   rE   r}   r7   s     r+   rG   z6ArrowWriter.write_examples_on_file.<locals>.<listcomp>  s1    TTTS8Ma8PQR8S1S1SS1S1S1Sr*   c                 0    g | ]}|j         j        v|S r)   r   namesr   s     r+   rG   z6ArrowWriter.write_examples_on_file.<locals>.<listcomp>  s(    aaasCt{O`D`D`sD`D`D`r*   r   c              3   |   K   | ]6}t          |d                   t          j        t          j        f          V  7dS )r   N)rL   r;   ArrayChunkedArrayrE   rowr}   s     r+   rV   z5ArrowWriter.write_examples_on_file.<locals>.<genexpr>  s>      iiC:c!fSkBHbo+FGGiiiiiir*   c                 ,    g | ]}|d                   S r   r)   r   s     r+   rG   z6ArrowWriter.write_examples_on_file.<locals>.<listcomp>  s!    GGG##a&+GGGr*   c                     g | ]h}t          |d                   t          j        t          j        f          r&|d                                                   d          n|d                   iS r   )rL   r;   r   r   	to_pylistr   s     r+   rG   z6ArrowWriter.write_examples_on_file.<locals>.<listcomp>  sv     ' ' ' 3=SVC[28UWUdJe2f2fwCF3K))++A..lopqlrsvlw' ' 'r*   )batch_examples)r   r   r   keysallr   write_batch)r7   colsr   arraysr}   s   `   @r+   write_examples_on_filez"ArrowWriter.write_examples_on_file  sg   $ 	F {4TTTTDK-TTTaaaad3A6q9>>@@aaab b &q)!,1133	 	  	 	C iiiiSWShiiiii GGGG1FGGG&26&:&:s##' ' ' '#4' ' 's## 	777 "r*   c                     | j         sdS t          j        | j                   }|                     |           g | _         dS )zwWrite stored rows from the write-pool of rows. It concatenates the single-row tables and it writes the resulting table.N)r   r;   concat_tableswrite_table)r7   tables     r+   write_rows_on_filezArrowWriter.write_rows_on_file  sG      	F !233r*   exampler   c                    | j         rS| j                            |          }| j                            ||f           | j                            ||f           n| j                            |df           || j        }|Pt          | j                  |k    r:| j         r|                                  g | _        | 	                                 dS dS dS )zAdd a given (Example,Key) pair to the write-pool of examples which is written to file.

        Args:
            example: the Example to add.
            key: Optional, a unique identifier(str, int or bytes) associated with each example
        r   N)
r   r   hashr   appendr   r   r   check_duplicate_keysr   )r7   r   r   r   r   s        r+   writezArrowWriter.write  s     ! 	8<$$S))D!(('4999##T3K0000 !(('2777$ $ 6(S1F-G-GK\-\-\% &))+++#% ''))))) )(-\-\r*   c                      t                      } j        D ]O\  }|v r1 fdt           j                  D             }t          ||          |                               PdS )z+Raises error if duplicates found in a batchc                 V    g | ]%\  }\  }}|k    t          j        |z             &S r)   )rh   r   )rE   indexduplicate_hash_r   r7   s       r+   rG   z4ArrowWriter.check_duplicate_keys.<locals>.<listcomp>  sG     ) ) )22%-- *U233---r*   N)setr   	enumerater   add)r7   
tmp_recordr   duplicate_key_indicesr   s   `   @r+   r   z ArrowWriter.check_duplicate_keys  s    UU
) 
	% 
	%ID#z!!) ) ) ) )6?@P6Q6Q) ) )% *#/DEEEt$$$$
	% 
	%r*   r   c                     | j                             |           || j        }|.t          | j                   |k    r|                                  dS dS dS )zAdd a given single-row Table to the write-pool of rows which is written to file.

        Args:
            row: the row to add.
        N)r   r   r   r   r   )r7   r   r   s      r+   	write_rowzArrowWriter.write_row  sl     	  %%%$ $ 6(S1B-C-CGX-X-X##%%%%% )(-X-Xr*   r   c                     rAt          t          t                                                                        dk    rdS  j        	 j        rdn j        } j         j        r j        nd}g }t                      } j        r9fd j        j	        D              fd
                                D             z   n
                                }|D ]}|         }	|r||         nd}
t          |	t          j        t          j        f          rA|
t          |	|
          n|	}|                    |           t#          |	j                  ||<   }|||v r||         nd}t'          |	|
||          }|                    t          j        |                     |                                ||<   ߉ j        |j        n j        }t          j                            ||          }                     ||           dS )zWrite a batch of Example to file.
        Ignores the batch if it appears to be empty,
        preventing a potential schema update of unknown types.

        Args:
            batch_examples: the batch of examples to add.
        r   Nc                     g | ]}|v |	S r)   r)   )rE   r}   r   s     r+   rG   z+ArrowWriter.write_batch.<locals>.<listcomp>  s#    GGGS1F1FS1F1F1Fr*   c                 0    g | ]}|j         j        v|S r)   r   r   s     r+   rG   z+ArrowWriter.write_batch.<locals>.<listcomp>  s(    TTTss$+BS7S7Ss7S7S7Sr*   )r/   r0   r}   )r   )r   nextitervaluesr   r   r   r   r   r   r   rL   r;   r   r   r   r   r   r/   r|   r<   r>   arrow_schemaTablefrom_arraysr   )r7   r   r   r   try_featuresr   r   r   r}   
col_valuescol_typer<   col_try_typetyped_sequencer   pa_tables   ``              r+   r   zArrowWriter.write_batch  sA     	c$tN,A,A,C,C'D'D"E"EFF!KKF>1d6J144PTP^)-)?DDX)?t~~^b$JJ {'GGGGDK-GGGTTTTn1133TTTU U  $$&&	 	  	L 	LC',J(0:x}}dH*rx&ABB LGOG[-j(CCCake$$$)A*/)R)R!#&&4@4LQTXdQdQd|C00jn!7
\hnq!r!r!rbh~66777)7)I)I)K)K!#&&37>3I"//t{8''v'>>#455555r*   r  c                 d   || j         }| j        |                     |j                   |                                }t          || j                  }| j        rt          |          }| xj	        |j
        z  c_	        | xj        |j        z  c_        | j                            ||           dS )zUWrite a Table to file.

        Args:
            example: the Table to add.
        N)r   )r   r   r   r   combine_chunksr   r   r   r   r   nbytesr   num_rowsr   )r7   r  r   s      r+   r   zArrowWriter.write_table-  s     $ $ 6>!x???**,,h55! 	5*844H8?*h//""8->?????r*   c                 d   |                                   | j        r|                                  g | _        |                                  | j        !| j        r|                     | j                   | j        <| j                                         d | _        |r| j	                                         n*|r| j	                                         t          d          t                              d| j         d| j         d| j         d| j        r| j        nd d	           | j        | j        fS )Nz@Please pass `features` or at least one example when writing datazDone writing  z in z bytes r   .)r   r   r   r   r   r   r   r   r   r   r%   rn   debugr   r   r   r   )r7   close_streams     r+   finalizezArrowWriter.finalize?  sC   !!!! 	"%%'''!D##%%%>!dk!t{+++>%N  """!DN $!!### $!!###&'ijjj{D.{{{{{{fjfpXxX\XbXbvx{{{	
 	
 	
 !4?22r*   )NNNNNNNFFFTr   FNrB   )NN)T)(r&   r'   r(   rx   r;   RecordBatchStreamWriterr   r   Schemar   rh   
NativeFileintbooldictr8   r   r   r   r   r   propertyr   ry   r   r   r   r   r   r   r	   bytesr   r   r  r	  r   r   r   r"  r)   r*   r+   r   r     s0       66.M '+'+"*.%)+/#'+0!& %""'*.C C#C 8$C sm	C
 'C c]C $C=C C=C #4.C C C C C  C "$C C C CJX X X         ARY A A A A. 6 6 X6 5 5k 5 5QUVY[^V^Q_ 5 5 5 \5# # #8   15+/	* *c3h* eCeO,-* $C=	* * * *B% % %
& 
&RX 
&(3- 
& 
& 
& 
& ,0&6 &6S$Y&6 $C=&6 &6 &6 &6P@ @BH @# @ @ @ @$3 3 3 3 3 3r*   r   c                       e Zd Zej        ZdS )ParquetWriterN)r&   r'   r(   pqr,  r   r)   r*   r+   r,  r,  Y  s        $MMMr*   r,  c                       e Zd ZdZ	 	 	 	 	 ddee         deej                 dee         dee         dee         f
dZ	d	 Z
d
efdZdS )
BeamWriterz
    Shuffles and writes Examples to Arrow files.
    The Arrow files are converted from Parquet files that are the output of Apache Beam pipelines.
    Nr   r   r   	namespace	cache_dirc                 b   ||t          d          |t          d          ||| _        |j        | _        n || _        t	          j        |          | _        || _        t          j        	                    |          d         | _
        |pd| _        d | _        |pt          j        | _        d S )Nz5At least one of features and schema must be provided.zPath must be provided.r   default)r3   r   r  r   r   r   r   osr   splitext_parquet_path
_namespacer   r   HF_DATASETS_CACHE
_cache_dir)r7   r   r   r   r0  r1  s         r+   r8   zBeamWriter.__init__c  s     TUUU<5666'/DN&.&;DLL&,DL'/'A&'I'IDN
W--d33A6#0y!#?v'?r*   c                      ddl  fd}|d                    |          z	  z  }|d                                z	  z  dj        j                             j         j        d          z	  z  S )	zAAdd the final steps of the beam pipeline: write to parquet files.r   Nc                 v    j         j                            j        d                                           d S )Nnum_examples)metricsMetricscounterr7  inc)r   beamr7   s    r+   inc_num_examplesz;BeamWriter.write_from_pcollection.<locals>.inc_num_examples  s2    L ((.IIMMOOOOOr*   zCount N. Examplesz
Get valueszSave to parquetz-SSSSS-of-NNNNN.parquet)shard_name_template)apache_beamMapValuesio	parquetioWriteToParquetr6  r   )r7   pcoll_examplesrB  r  rA  s   `   @r+   write_from_pcollectionz!BeamWriter.write_from_pcollection}  s    """"	P 	P 	P 	P 	P 	P 0DHH=M4N4NNN dkkmm+,w //"DLF_ 0  	
r*   metrics_query_resultc                    ddl }ddlm} t          |j        j        j                            | j        dz   g          d         j	                  }d |D             }t          d |D                       }t          |          }| j                            d          rt                              d	| j         d
| j                    d |j        j        j                            | j        dz   g          d         j	        D             }	 t!          j                     }d}t!          j        |d|          D ]}	|j        j        j                            |	          5 }
|j        j        j                            |	                    dd                    5 }t-          |
|          \  }}||z  }ddd           n# 1 swxY w Y   ddd           n# 1 swxY w Y   nS# t.          $ rE}|j        t0          j        k    r t                              d           t6          j                            | j        d          }t7          j        |d           t!          j                     }d}t!          j        |d|          D ]}	t6          j                            |tA          |	          dz             }|!                    |	|           |                    dd          }t-          ||          \  }}||z  }|	                    dd          }|"                    ||           Y d}~nd}~ww xY wd |d         D             }|d         | _#        || _$        || _%        | j#        | j$        fS )a  
        Run after the pipeline has finished.
        It converts the resulting parquet files to arrow and it completes the info from the pipeline metrics.

        Args:
            metrics_query_result: `dict` obtained from pipeline_results.metrics().query(m_filter). Make sure
                that the filter keeps only the metrics for the considered split, under the namespace `split_name`.
        r   Nr
   )
beam_utilsz	*.parquetc                     g | ]	}|j         
S r)   r   rE   r   s     r+   rG   z'BeamWriter.finalize.<locals>.<listcomp>  s    @@@H(-@@@r*   c                     g | ]	}|j         
S r)   )size_in_bytesrQ  s     r+   rG   z'BeamWriter.finalize.<locals>.<listcomp>  s    PPPH/PPPr*   z.arrowzConverting parquet files z
 to arrow c                     g | ]	}|j         
S r)   rP  rQ  s     r+   rG   z'BeamWriter.finalize.<locals>.<listcomp>  s*          r*   shardsr   disablez.parquetzWBroken Pipe during stream conversion from parquet to arrow. Using local convert insteadbeam_convertT)exist_okc                 <    i | ]}|j         j        j        |j        S r)   )r   metricrm   result)rE   r[  s     r+   r   z'BeamWriter.finalize.<locals>.<dictcomp>  s$    nnn6*/nnnr*   countersr<  )&rD  utilsrN  r\   rG  filesystemsFileSystemsmatchr6  metadata_listsumget_parquet_lengthsr   endswithrn   ro   r    is_progress_bar_enabledtqdmr   createreplaceparquet_to_arrowOSErrorerrnoEPIPEwarningr4  r   joinr9  makedirsr!   download_remote_to_localupload_local_to_remoter   r   _shard_lengths)r7   rL  rA  rN  shards_metadatarU  	num_bytesshard_lengthsrW  shardsourcedestinationshard_num_bytesr  ru   local_convert_dirlocal_parquet_pathlocal_arrow_pathremote_arrow_pathcounters_dicts                       r+   r"  zBeamWriter.finalize  s?    	#"""%%%%%%G+1143E3S2TUUVWXf
 
 A@@@@PPPPPQQ	+F33 :x(( #	[KK^D4F^^RVR\^^___  $ 3 ? E EtGY\gGgFh i i!	  F[%=???	$\&xQQQ 9 9E,8==eDD 9!W0<CC!MM*h??  9(1A&+1V1V.OQ%8I	9 9 9 9 9 9 9 9 9 9 9 9 9 9 99 9 9 9 9 9 9 9 9 9 9 9 9 9 99  [ [ [7ek))m   %'GLL.$Q$Q!-====%=???	$\&xQQQ [ [E)+6GI]^cIdIdgqIq)r)r&77?QRRR'9'A'A*h'W'W$)9:LN^)_)_&OQ0I(-j((K(K%556FHYZZZZ[ [ [ [ [[( onMablMmnnn*>:#+!4?22sb   AG" 9GF=1G=GGGGG" G	G" G	G" "L2-D;L--L2)NNNNN)r&   r'   r(   rx   r   r   r;   r$  rh   r8   rK  r(  r"  r)   r*   r+   r/  r/  ]  s          (,&*"#'#'@ @8$@ #@ sm	@
 C=@ C=@ @ @ @4
 
 
(?3T ?3 ?3 ?3 ?3 ?3 ?3r*   r/  r9   c                     g }t          j                     }t          j        | d|          D ]@}t          j                            |          }|                    |j        j                   A|S )Nzparquet filesrV  )	r    rf  rg  r;   parquetParquetFiler   r   r  )sourcesrv  rW  rx  parquet_files        r+   rd  rd    sp    M1333G,w_gNNN = =z--f55\2;<<<<r*   c                    t          |t                    rdn|}t          ||          5 }t          j                            |           }|                                D ]7}t          j                            |g          }|	                    |           8|
                                \  }}ddd           n# 1 swxY w Y   ||fS )zPConvert parquet file to arrow file. Inputs can be str paths or file-like objectsN)r   r   )rL   rh   r   r;   r  r  iter_batchesr  from_batchesr   r"  )	rx  ry  r   writerr  record_batchr  ru  r<  s	            r+   rj  rj    s    S11BTT{F	+f	5	5	5 4z--f55(5577 	) 	)Lx,,l^<<Hx(((("(//"3"3	<4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 l""s   BB::B>B>)Grx   rl  r   r4  rI   typingr   r   r   r   r   r   r	   r   numpyrT   pyarrowr;   pyarrow.parquetr  r-  r   r   r   r   r   r   features.featuresr   r   r   r   r   r   r   r   r_  r   ro   r   keyhashr   r   r   r   r   r   r   r   r^  r    utils.file_utilsr!   utils.py_utilsr"   r#   
get_loggerr&   rn   r/   rj   r3   r%   r-   r|   r   r,  r/  r&  rd  rj  r)   r*   r+   <module>r     sL   + *   				 



 D D D D D D D D D D D D D D D D D D                      , , , , , , , , , ,	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 . - - - - -       3 3 3 3 3 3 3 3 c c c c c c c c c c c c c c       2 2 2 2 2 2 8 8 8 8 8 8 8 8 
	H	%	%	 	 	 	 	: 	 	 	J J J J J J J JZd d d d d] d d d,{3 {3 {3 {3 {3 {3 {3 {3|	% % % % %K % % %s3 s3 s3 s3 s3 s3 s3 s3lDI    	#T#Y 	# 	# 	# 	# 	# 	#r*   