U
    H$xe6>                  	   @   sv  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZmZ d dlZd dlmZmZmZmZ d dlmZ d dlmZ d d	l m!Z!m"Z" e# Z$e% Z&erej'Z(ne&j'Z(G d
d de(Z)G dd de(Z*G dd dZ+G dd deZ,G dd de,Z-G dd de,Z.deeee/ee0e!df ee/ edddZ1dS )    N)ABCabstractmethod)	ExitStack)
Connection)AnyListOptionalTupleSequenceIteratorTYPE_CHECKINGUnion)
InputFilesOutputFiles
InputPathsxopen_rb_raise_limit)Pipeline)
Statistics)ProgressDummyProgressc                       sf   e Zd ZdZeee eje	d fddZ
dd Zeeedf  dd	d
ZdddZdd Z  ZS )ReaderProcessaC  
    Read chunks of FASTA or FASTQ data (single-end or paired) and send them to a worker.

    The reader repeatedly

    - reads a chunk from the file(s)
    - reads a worker index from the Queue
    - sends the chunk to connections[index]

    and finally sends the stop token -1 ("poison pills") to all connections.
    )pathsconnectionsqueuebuffer_sizec                   sL   t    t|dkrtd|s*td|| _|| _|| _|| _|| _dS )a  
        Args:
            paths: path to input files
            connections: a list of Connection objects, one for each worker.
            queue: a Queue of worker indices. A worker writes its own index into this
                queue to notify the reader that it is ready to receive more data.
            buffer_size:
            stdin_fd:

        Note:
            This expects the paths to the input files as strings because these can be pickled
            while file-like objects such as BufferedReader cannot. When using multiprocessing with
            the "spawn" method, which is the default method on macOS, function arguments must be
            picklable.
           z8Reading from more than two files currently not supportedzMust provide at least one fileN)	super__init__len
ValueError_pathsr   r   r   stdin_fd)selfr   r   r   r!   r   	__class__ /lib/python3.8/site-packages/cutadapt/runners.pyr   +   s    
zReaderProcess.__init__c              
      s   | j dkr"tj  t| j t_zZt B  fdd| jD }t| j	| D ]\}}| j
|f|  qNW 5 Q R X |   W nJ tk
r } z,| jD ] }|d ||t f qW 5 d }~X Y nX d S )Nc                    s   g | ]}  t|qS r%   )enter_contextr   ).0pathstackr%   r&   
<listcomp>S   s   z%ReaderProcess.run.<locals>.<listcomp>)r!   sysstdincloseosfdopenr   r    	enumerate_read_chunkssend_to_workershutdown	Exceptionr   send	traceback
format_exc)r"   filesindexchunkse
connectionr%   r+   r&   runM   s    




zReaderProcess.run.returnc                 g   sh   t |dkr.t|d | jD ]}|fV  qn6t |dkr`t|d |d | jD ]
}|V  qRntd S )N   r   r   )r   dnaioZread_chunksr   Zread_paired_chunksNotImplementedError)r"   r<   chunkr>   r%   r%   r&   r5   `   s      
zReaderProcess._read_chunksNc                 C   s>   | j  }| j| }|| || |d k	r:|| d S N)r   getr   r9   
send_bytes)r"   chunk_indexZchunk1Zchunk2worker_indexr@   r%   r%   r&   r6   l   s    



zReaderProcess.send_to_workerc                 C   s2   t t| jD ]}| j }| j| d qd S Nr'   )ranger   r   r   rI   r9   )r"   _rL   r%   r%   r&   r7   t   s    
zReaderProcess.shutdown)N)__name__
__module____qualname____doc__strr
   r   multiprocessingQueueintr   rA   r   r	   
memoryviewr5   r6   r7   __classcell__r%   r%   r#   r&   r      s   "
r   c                	       sP   e Zd ZdZeeeeeeee	j
d fddZdd Zeeeddd	Z  ZS )
WorkerProcessa7  
    The worker repeatedly reads chunks of data from the read_pipe, runs the pipeline on it
    and sends the processed chunks to the write_pipe.

    To notify the reader process that it wants data, it puts its own identifier into the
    need_work_queue before attempting to read data from the read_pipe.
    )id_pipelinen_input_filesinterleaved_inputorig_outfiles	read_pipe
write_pipeneed_work_queuec	           	         sB   t    || _|| _|| _|| _|| _|| _|| _|	 | _
d S rH   )r   r   _id	_pipeline_n_input_files_interleaved_input
_read_pipe_write_pipe_need_work_queue
as_bytesio_original_outfiles)	r"   r[   r\   r]   r^   r_   r`   ra   rb   r#   r%   r&   r      s    
zWorkerProcess.__init__c              
      sr  z&t  } j j  j }|dkr.qn&|dkrT j \}}td| | fddt j	D }t
|d ji} j } j||\}}	}
 j  t  ||	|
g  jj}||7 } |||  j  q
 jj}t  dd jjrdnd |g }||7 } jd  j| W nD tk
rl } z$ jd  j|t f W 5 d }~X Y nX d S )Nr'   r.   %sc                    s   g | ]}t  j qS r%   )ioBytesIOrg   
recv_bytesr)   rO   r"   r%   r&   r-      s   z%WorkerProcess.run.<locals>.<listcomp>interleavedr   )r   ri   Zputrc   rg   recvloggererrorrN   re   r   rf   rk   rj   rd   process_readsflushcollect_steps_send_outfilesr1   
_modifiersZpairedrh   r9   r8   r:   r;   )r"   statsrK   r?   tb_strr<   infilesoutfilesnZbp1Zbp2	cur_statsmZmodifier_statsr%   rq   r&   rA      sF    



    zWorkerProcess.run)r   rK   n_readsc                 C   sR   | j | | j | |D ]0}|  t|tjs8t| }| j | qd S rH   )	rh   r9   rw   
isinstancerm   rn   AssertionErrorgetvaluerJ   )r"   r   rK   r   fZprocessed_chunkr%   r%   r&   rz      s    zWorkerProcess._send_outfiles)rP   rQ   rR   rS   rW   r   boolr   r   rU   rV   r   rA   rz   rY   r%   r%   r#   r&   rZ   {   s   
(rZ   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	OrderedChunkWriterz
    We may receive chunks of processed data from worker processes
    in any order. This class writes them to an output file in
    the correct order.
    c                 C   s   t  | _d| _|| _d S )Nr   )dict_chunks_current_index_outfile)r"   Zoutfiler%   r%   r&   r      s    zOrderedChunkWriter.__init__c                 C   sH   || j |< | j| j krD| j| j | j  | j | j= |  jd7  _q
dS ) rD   N)r   r   r   write)r"   datar=   r%   r%   r&   r      s
    

zOrderedChunkWriter.writec                 C   s   | j  S rH   )r   rq   r%   r%   r&   wrote_everything   s    z#OrderedChunkWriter.wrote_everythingN)rP   rQ   rR   rS   r   r   r   r%   r%   r%   r&   r      s   r   c                   @   sN   e Zd ZdZeedddZeedddZ	edd	 Z
d
d Zdd ZdS )PipelineRunnerz$
    A read processing pipeline
    )r\   progressc                 C   s   || _ || _d S rH   )rd   	_progress)r"   r\   r   r%   r%   r&   r      s    zPipelineRunner.__init__rB   c                 C   s   d S rH   r%   rq   r%   r%   r&   rA      s    zPipelineRunner.runc                 C   s   d S rH   r%   rq   r%   r%   r&   r1      s    zPipelineRunner.closec                 C   s   | S rH   r%   rq   r%   r%   r&   	__enter__   s    zPipelineRunner.__enter__c                 G   s   |    d S rH   )r1   )r"   argsr%   r%   r&   __exit__   s    zPipelineRunner.__exit__N)rP   rQ   rR   rS   r   r   r   r   r   rA   r1   r   r   r%   r%   r%   r&   r      s   
r   c                       s   e Zd ZdZdeeeeee	e d fddZ
ddeeddd	d
Zeee ee f dddZedddZedd ZddddZ  ZS )ParallelPipelineRunnera  
    Run a Pipeline in parallel

    - When connect_io() is called, a reader process is spawned.
    - When run() is called, as many worker processes as requested are spawned.
    - In the main process, results are written to the output files in the correct
      order, and statistics are aggregated.

    If a worker needs work, it puts its own index into a Queue() (_need_work_queue).
    The reader process listens on this queue and sends the raw data to the
    worker that has requested work. For sending the data from reader to worker,
    a Connection() is used. There is one such connection for each worker (self._pipes).

    For sending the processed data from the worker to the main process, there
    is a second set of connections, again one for each worker.

    When the reader is finished, it sends 'poison pills' to all workers.
    When a worker receives this, it sends a poison pill to the main process,
    followed by a Statistics object that contains statistics about all the reads
    processed by that worker.
    N)r\   inpathsr   r   	n_workersr   c                    sN   t  || || _t | _|d kr*dn|| _|| _| j|j	d|j
i d S )Ni  @ rr   )r   r   
_n_workersmpctxrV   ri   _buffer_size	_outfiles_assign_inputr   rr   )r"   r\   r   r   r   r   r   r#   r%   r&   r     s    	
zParallelPipelineRunner.__init__F)rr   )r   rr   rC   c                G   s   t || _|| _dd t| jD }t| \| _}ztj	 }W n t
jk
rZ   d}Y nX t||| j| j|d| _d| j_| j  d S )Nc                 S   s   g | ]}t jd dqS )FZduplex)r   Piperp   r%   r%   r&   r-   .  s     z8ParallelPipelineRunner._assign_input.<locals>.<listcomp>r'   )r   r   r   r!   T)r   re   rf   rN   r   zip_connectionsr/   r0   filenorm   UnsupportedOperationr   ri   r   _reader_processdaemonstart)r"   rr   r   r   Zconnwr   r%   r%   r&   r   &  s"    


z$ParallelPipelineRunner._assign_inputrB   c              
   C   sx   g }g }t | jD ]\}tjdd\}}|| t|| j| j| j| j	| j
| || j}d|_|  || q||fS )NFr   T)rN   r   r   r   appendrZ   rd   re   rf   r   r   ri   r   r   )r"   workersr   r=   Zconn_rZconn_wZworkerr%   r%   r&   _start_workers@  s&    

z%ParallelPipelineRunner._start_workersc                 C   s   |   \}}g }| jD ]}|t| qt }|rtj|}|D ]h}| |}|dkrx| |}	||	7 }|	| qD| |}
| j
|
 |D ]}| }||| qqDq0|D ]}| stq|D ]}|  q| j  | j
  |S rM   )r   r   r   r   r   rU   r@   wait_try_receiveremover   updatero   r   r   r   joinr   r1   )r"   r   r   Zwritersr   r|   Zready_connectionsr@   rK   r   Znumber_of_readswriterr   wr%   r%   r&   rA   U  s4    







zParallelPipelineRunner.runc                 C   sF   |   }|dkrB|   \}}td| t D ]}|  q0||S )z{
        Try to receive data over `self.connection` and return it.
        If an exception was received, raise it.
        r.   rl   )rs   rt   debugrU   Zactive_childrenZ	terminate)r@   resultr?   r}   Zchildr%   r%   r&   r   s  s    
z#ParallelPipelineRunner._try_receivec                 C   s   | j   d S rH   )r   r1   rq   r%   r%   r&   r1     s    zParallelPipelineRunner.close)N)rP   rQ   rR   rS   r   r   r   r   rW   r   r   rT   r   r   r	   r   rZ   r   r   r   rA   staticmethodr   r1   rY   r%   r%   r#   r&   r      s*    
r   c                       sH   e Zd ZdZeeeed fddZe	dddZ
ddd	d
Z  ZS )SerialPipelineRunnerz)
    Run a Pipeline on a single core
    )r\   r~   r   r   c                    s   t  || || _|| _d S rH   )r   r   _infilesr   )r"   r\   r~   r   r   r#   r%   r&   r     s    zSerialPipelineRunner.__init__rB   c                 C   sd   | j j| j| j| jd\}}}| jd k	r2| j  t| j dd }|d k	sLtt 	||||| j j
S )N)r   r{   )rd   rv   r   r   r   r1   getattrr   r   rx   ry   )r"   r   Z	total1_bpZ	total2_bpZ	modifiersr%   r%   r&   rA     s       

    zSerialPipelineRunner.runNc                 C   s   | j   d S rH   )rd   r1   rq   r%   r%   r&   r1     s    zSerialPipelineRunner.close)rP   rQ   rR   rS   r   r   r   r   r   r   rA   r1   rY   r%   r%   r#   r&   r     s   r   )r\   r   r   coresr   r   rC   c              	   C   sr   |dks|dkrt  }n|dkr&t }|dkrDt| |||||d}nt| | ||}| | }W 5 Q R X |S )u  
    Run a pipeline.

    This uses a SerialPipelineRunner if cores is 1 and a ParallelPipelineRunner otherwise.

    Args:
        inpaths:
        outfiles:
        cores: number of cores to run the pipeline on (this is actually the number of worker
            processes, there will be one extra process for reading the input file(s))
        progress: Set to False for no progress bar, True for Cutadapt’s default progress bar,
            or use an object that supports .update() and .close() (e.g. a tqdm instance)
        buffer_size: Forwarded to `ParallelPipelineRunner()`. Ignored if cores is 1.

    Returns:
        A Statistics object
    NFTrD   )r   r   )r   r   r   r   openrA   )r\   r   r   r   r   r   ZrunnerZ
statisticsr%   r%   r&   run_pipeline  s"    	r   )NN)2rm   ZloggingrU   r2   r/   r:   abcr   r   
contextlibr   Zmultiprocessing.connectionr   typingr   r   r   r	   r
   r   r   r   rE   Zcutadapt.filesr   r   r   r   Zcutadapt.pipeliner   Zcutadapt.reportr   Zcutadapt.utilsr   r   Z	getLoggerrt   Zget_contextr   ZProcessZmpctx_Processr   rZ   r   r   r   r   rW   r   r   r%   r%   r%   r&   <module>   sJ   (]S &  