
    ԋgW2                       d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZmZ erd d	lmZ  ej6                  e      Zd
 Zd Zd Z d Z! G d d      Z" G d d      Z#d Z$y)    )annotationsN)Callable)Queue)TYPE_CHECKING)Future)IOLoop)get_mp_contextwait_for)Selfc                    	  | j                   |g|  y# t        $ r*}t        j                  dt	        |            s Y d}~yd}~ww xY w)zQ
    Helper to silence "IOLoop is closing" exception on IOLoop.add_callback.
    zIOLoop is clos(ed|ing)N)add_callbackRuntimeErrorresearchstr)loopfuncargsexcs       3lib/python3.12/site-packages/distributed/process.py_loop_add_callbackr      sG    $&& yy13s8< =s    	A	 AA	c                H    | j                         s| j                  |       y y N)	cancelled
set_result)futurevalues     r   #_future_set_result_unless_cancelledr   )   s!    %      c                v    | j                         s| j                  |       y t        j                  d|       y )Nz$Exception after Future was cancelled)exc_info)r   set_exceptionloggererror)r   r   s     r   &_future_set_exception_unless_cancelledr%   .   s-    S!;cJr   c                    	  ||i |}t        | t        ||       y # t        $ r}t        | t        ||       Y d }~y d }~ww xY wr   )r   r   	Exceptionr%   )r   r   r   r   kwargsresr   s          r   _call_and_set_futurer*   5   sQ    SD#F# 	4!DfcR  V 	4!GQTUUVs    	A=Ac                      e Zd ZdZdZdZy)_ProcessStateFN)__name__
__module____qualname__is_alivepidexitcode r   r   r,   r,   @   s    H
CHr   r,   c                     e Zd ZU dZded<   ddZd Zd Zd Zdd	Z	e
d
        Ze
d        Ze
	 	 dd       Ze
d        Zd ZddZddZddZd ZddZd Zed        Zed        Zed        Zed        Zej6                  d        Zy) AsyncProcessz
    A coroutine-compatible multiprocessing.Process-alike.
    All normally blocking methods are wrapped in Tornado coroutines.
    multiprocessing.Process_processNc           
        |xs i }t        |      st        dt        |            t               | _        |xs t        j                  d      | _        t               j                  d      \  }| _
        t               j                  | j                  |||||| j                  t        j                  j                  f      | _        | j                   j"                  | _        t'        j(                  | t*        | j                         | _        t/               | _        t3               | _        d | _        d| _        | j;                          y )Nz#`target` needs to be callable, not F)instance)duplextargetnamer   )callable	TypeErrortyper,   _stater   current_loopr	   Pipe_keep_child_aliveProcess_rundaskconfigglobal_configr7   r=   _nameweakreffinalize_asyncprocess_finalizer_proc_finalizerPyQueue_watch_qr   _exit_future_exit_callback_closed_start_threads)selfr   r<   r=   r   r(   parent_alive_pipes          r   __init__zAsyncProcess.__init__N   s   2A$v,AQRSS#o;V^^U;
 5C4D4I4IQV4I4W141&(0099!&&)) 1 
 ]]''
&//)4== 
  	"H"r   c                P    d| j                   j                   d| j                   dS )N< >)	__class__r-   rK   rV   s    r   __repr__zAsyncProcess.__repr__x   s&    4>>**+1TZZL::r   c                2    | j                   rt        d      y )Nz(invalid operation on closed AsyncProcess)rT   
ValueErrorr^   s    r   _check_closedzAsyncProcess._check_closed{   s    <<GHH r   c           
        t        j                  | j                  d| j                  z  t	        j
                  |       | j                  | j                  | j                  | j                  | j                  f      | _        d| j                  _        | j                  j                          d }t	        j                  | || j                        | _        d| j                  _        y )Nz#AsyncProcess %s watch message queuer;   Tc                *    | j                  ddi       y )Nopstop)
put_nowaitqs    r   stop_threadz0AsyncProcess._start_threads.<locals>.stop_thread   s    LL$(r   rh   F)	threadingThread_watch_message_queuer=   rL   refr7   rC   rA   rQ   rR   _watch_message_threaddaemonstartrM   _thread_finalizeratexit)rV   rj   s     r   rU   zAsyncProcess._start_threads   s    %.%5%5,,6BD!

!!&
" -1"")""((*	)
 ")!1!1$t}}!U(-%r   c                    d | _         | j                  | j                  |        | j                  j                  |       y r   )r7   rS   rR   r   )rV   r2   s     r   _on_exitzAsyncProcess._on_exit   s7    *%$$X.r   c                j    fd}t        j                  |      }d|_        |j                          y)zP
        Immediately exit the process when parent_alive_pipe is closed.
        c                     	  j                          t        d      # t        $ r t        j                  d       Y y w xY w)Nz'unexpected state: should be unreachable)recvr   EOFErroros_exit)rW   s   r   monitor_parentz@AsyncProcess._immediate_exit_when_closed.<locals>.monitor_parent   sC    N "&&( ##LMM  
 
s    ??)r<   TN)rk   rl   rp   rq   )clsrW   r}   ts    `  r   _immediate_exit_when_closedz(AsyncProcess._immediate_exit_when_closed   s+    	N, N3		r   c                    |j                          | j                  |       dt        j                         _        t
        j                  j                  t
        j                  j                  |d        ||i | y )N
MainThreadold)priority)	closer   rk   current_threadr=   rH   rI   updaterJ   )r~   r<   r   r(   rW   rE   inherit_configs          r   rG   zAsyncProcess._run   sb     	! 	''(9:*6	  "'4;;44nuUr   c                  
 t                              j                  

fd}	 j                         }t        j	                  d d|       |d   }	|	dk(  rt        ||d   |       nM|	dk(  rt        ||d   j                         n-|	dk(  rt        ||d   j                         n|	d	k(  ry J |       )
Nc                 (   j                          t        j                  t        j                  dz  f      } d| _        | j                          d_        j                  _        t        j                  d dj                         y )Nz"AsyncProcess %s watch process joinr;   T[z] created process with pid )
rq   rk   rl   r5   _watch_processrp   r0   r1   r#   debug)threadr=   processri   rselfrefstates    r   _startz1AsyncProcess._watch_message_queue.<locals>._start   s{    MMO%%#229D@wq1F
 !FMLLN!ENEILL1QC:599-HIr   r   z] got message re   rq   r   	terminatekillrf   )reprr=   getr#   r   r*   r   r   )r~   r   r   r   r   ri   exit_futurer   msgre   r=   r   s    `` ``    @@r   rm   z!AsyncProcess._watch_message_queue   s     Oy~~	J 	J %%'CLL1QC~cW56TBW}$T3x=&A{"$T3x=':K:KLv$T3x=',,Gv#q! r   c                x   t         |             }|j                          |j                  x}}|d}d|_        ||_         |       }	 |!t	        |j
                  |j                  |       d }|"t        j                  d||j                         y t        j                  d||j                  |       y # d }w xY w)N   FzE[%s] process %r exit status was already read will report exitcode 255z#[%s] process %r exited with code %r)r   joinr2   r0   r   rC   ru   r#   warningr1   r   )	r~   r   r   r   ri   r   r2   original_exit_coderV   s	            r   r   zAsyncProcess._watch_process   s    O(/(8(88% H! y	"4::t}}hGD %NNW		 LL>599hW Ds   	#B5 5B9c                v    | j                          t               }| j                  j                  d|d       |S )zQ
        Start the child process.

        This method returns a future.
        rq   re   r   rb   r   rQ   rg   rV   futs     r   rq   zAsyncProcess.start  s3     	h  3!?@
r   c                v    | j                          t               }| j                  j                  d|d       |S )zTerminate the child process.

        This method returns a future.

        See also
        --------
        multiprocessing.Process.terminate
        r   r   r   r   s     r   r   zAsyncProcess.terminate!  s3     	"H  s!CD
r   c                v    | j                          t               }| j                  j                  d|d       |S )zSend SIGKILL to the child process.
        On Windows, this is the same as terminate().

        This method returns a future.

        See also
        --------
        multiprocessing.Process.kill
        r   r   r   r   s     r   r   zAsyncProcess.kill/  s3     	"H  #!>?
r   c                   K   | j                          | j                  j                  J d       | j                  j                  yt	        t        j                  | j                        |       d{    y7 w)z_
        Wait for the child process to exit.

        This method returns a coroutine.
        Nzcan only join a started process)rb   rA   r1   r2   r
   asyncioshieldrR   )rV   timeouts     r   r   zAsyncProcess.join>  sb      	{{*M,MM*;;+ w~~d&7&78'BBBs   A1A;3A94A;c                Z    | j                   s| j                          d| _        d| _         yy)z
        Stop helper thread and release resources.  This method returns
        immediately and does not ensure the child process has exited.
        NT)rT   rr   r7   r^   s    r   r   zAsyncProcess.closeL  s*    
 ||""$ DMDL r   c                    t        j                  |      rJ d       t        |      sJ d       | j                  j                  J d       || _        y)z
        Set a function to be called by the event loop when the process exits.
        The function is called with the AsyncProcess as sole argument.

        The function may not be a coroutine function.
        z-exit callback may not be a coroutine functionz exit callback should be callableNz5cannot set exit callback when process already started)inspectiscoroutinefunctionr>   rA   r1   rS   )rV   r   s     r   set_exit_callbackzAsyncProcess.set_exit_callbackV  sc     ..
 	;:	; 
 ~AAA~KKOO#	CB	C#"r   c                .    | j                   j                  S r   )rA   r0   r^   s    r   r0   zAsyncProcess.is_aliveg  s    {{###r   c                .    | j                   j                  S r   )rA   r1   r^   s    r   r1   zAsyncProcess.pidj  s    {{r   c                .    | j                   j                  S r   )rA   r2   r^   s    r   r2   zAsyncProcess.exitcoden  s    {{###r   c                    | j                   S r   )rK   r^   s    r   r=   zAsyncProcess.namer  s    zzr   c                .    | j                   j                  S r   r7   rp   r^   s    r   rp   zAsyncProcess.daemonv  s    }}###r   c                &    || j                   _        y r   r   )rV   r   s     r   rp   zAsyncProcess.daemonz  s    $r   )NNNr3   N)r2   intreturnNone)r   r6   )r   zasyncio.Future[None]r   )rV   r   r   zCallable[[Self], None]r   r   )r-   r.   r/   __doc____annotations__rX   r_   rb   rU   ru   classmethodr   rG   rm   r   rq   r   r   r   r   r   r0   propertyr1   r2   r=   rp   setterr3   r   r   r5   r5   F   s   
 &%(T;I.0/  >     (6( (T X X<	C #"$   $ $   $ $ ]]% %r   r5   c                    | j                         r*	 t        j                  d|         | j                          y y # t        $ r Y y w xY w)Nzreaping stray process )r0   r#   infor   OSError)procs    r   rN   rN     sG    }}	KK078NN   		s   (< 	AA)%
__future__r   r   r   loggingmultiprocessingr{   r   rk   rL   collections.abcr   queuer   rP   typingr   tornado.concurrentr   tornado.ioloopr   rH   distributed.utilsr	   r
   typing_extensionsr   	getLoggerr-   r#   r   r   r%   r*   r,   r5   rN   r3   r   r   <module>r      s    "     	 	   $ "   % !  6& 
		8	$!
KS v% v%r	r   