
    \d@5                         d Z ddlZddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZ ddlmZmZ  G d	 d
e          Z G d de          Z G d de          ZdS )zD
Test running processes with the APIs in L{twisted.internet.utils}.
    N)skipIf)error
interfacesreactorutils)Deferred)platform)SuppressedWarningsTests)SynchronousTestCaseTestCasec                       e Zd ZdZ ej        ed          dZdZdZ	e
j        Zd Zd Zd Zd Zd Zd	 Z e ej                    d
          d             Zd Zd Zd Zd Zd Zd Zd Zd Zd ZdS )ProcessUtilsTestszt
    Test running a process using L{getProcessOutput}, L{getProcessValue}, and
    L{getProcessOutputAndValue}.
    Nz)reactor doesn't implement IReactorProcessc                 6   |                                  }t          |d          5 }|                    t          j                            |          t          j        z              ddd           n# 1 swxY w Y   t          j                            |          S )zj
        Write the given list of lines to a text file and return the absolute
        path to it.
        wtN)mktempopenwriteoslinesepjoinpathabspath)selfsourceLinesscript
scriptFiles       8lib/python3.11/site-packages/twisted/test/test_iutils.pymakeSourceFilez ProcessUtilsTests.makeSourceFile$   s    
 &$ 	H:RZ__[99BJFGGG	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	Hwv&&&s   A A11A58A5c                     |                      g d          }t          j        | j        d|g          }|                    | j        d          S )z
        L{getProcessOutput} returns a L{Deferred} which fires with the complete
        output of the process it runs after that process exits.
        )
import syszfor s in b'hello world\n':z    s = bytes([s])z    sys.stdout.buffer.write(s)z    sys.stdout.flush()-us   hello world
r   r   getProcessOutputexeaddCallbackassertEqualr   r   ds      r   test_outputzProcessUtilsTests.test_output.   s[    
 ((  
 

 "48dJ-?@@}}T-/?@@@    c                                            ddg          }t          j         j        d|g          }                     |t
                    } fd}|                    |           |S )z
        The L{Deferred} returned by L{getProcessOutput} is fired with an
        L{IOError} L{Failure} if the child process writes to stderr.
        r    z!sys.stderr.write("hello world\n")r!   c                 N                         | j        t          j                  S N)assertFailureprocessEndedr   ProcessDone)errr   s    r   cbFailedz?ProcessUtilsTests.test_outputWithErrorIgnored.<locals>.cbFailedL   s    %%c&68IJJJr*   )r   r   r#   r$   r.   IOErrorr%   )r   r   r(   r2   s   `   r   test_outputWithErrorIgnoredz-ProcessUtilsTests.test_outputWithErrorIgnored?   s     ((?@
 

 "48dJ-?@@q'**	K 	K 	K 	K 	K 	
hr*   c                     |                      g d          }t          j        | j        d|gd          }|                    | j        d          S )z
        If a C{True} value is supplied for the C{errortoo} parameter to
        L{getProcessOutput}, the returned L{Deferred} fires with the child's
        stderr output as well as its stdout output.
        )r    zsys.stdout.write("foo")sys.stdout.flush()zsys.stderr.write("foo")sys.stderr.flush()r!   T)errortoos   foofoor"   r'   s      r   test_outputWithErrorCollectedz/ProcessUtilsTests.test_outputWithErrorCollectedR   s_     ((  

 


 "48dJ-?$OOO}}T-y999r*   c                     |                      dg          }t          j        | j        d|g          }|                    | j        d          S )z|
        The L{Deferred} returned by L{getProcessValue} is fired with the exit
        status of the child process.
        zraise SystemExit(1)r!      )r   r   getProcessValuer$   r%   r&   r'   s      r   
test_valuezProcessUtilsTests.test_valueg   sK    
 ((*?)@AA
!$(T:,>??}}T-q111r*   c                                            g d          } fd}t          j         j        d|g          }|                    |          S )a  
        The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
        three-tuple, the elements of which give the data written to the child's
        stdout, the data written to the child's stderr, and the exit status of
        the child.
        )r    z*sys.stdout.buffer.write(b'hello world!\n')z,sys.stderr.buffer.write(b'goodbye world!\n')zsys.exit(1)c                     | \  }}}                     |d                                |d                                |d           d S )Ns   hello world!
s   goodbye world!
r;   r&   )out_err_codeoutr1   coder   s       r   gotOutputAndValuez@ProcessUtilsTests.test_outputAndValue.<locals>.gotOutputAndValue   sX    )NCdS"3444S"5666T1%%%%%r*   r!   r   r   getProcessOutputAndValuer$   r%   r   r   rD   r(   s   `   r   test_outputAndValuez%ProcessUtilsTests.test_outputAndValueq   sp     ((  
 

	& 	& 	& 	& 	& *48dJ5GHH}}.///r*   z"Windows doesn't have real signals.c                                            g d          } fd}t          j         j        d|g          }                     |t
                    }|                    |          S )z
        If the child process exits because of a signal, the L{Deferred}
        returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
        containing the child's stdout, stderr, and the signal which caused
        it to exit.
        )zimport sys, os, signalz"sys.stdout.write('stdout bytes\n')z"sys.stderr.write('stderr bytes\n')r6   r7   z$os.kill(os.getpid(), signal.SIGKILL)c                     | \  }}}                     |d                                |d                                |t          j                   d S )Ns   stdout bytes
s   stderr bytes
)r&   signalSIGKILL)out_err_sigrB   r1   sigr   s       r   rD   z>ProcessUtilsTests.test_outputSignal.<locals>.gotOutputAndValue   sZ    'MCcS"3444S"3444S&.11111r*   r!   )r   r   rF   r$   r.   tupler%   rG   s   `   r   test_outputSignalz#ProcessUtilsTests.test_outputSignal   s     ((  	
 	

	2 	2 	2 	2 	2 *48dJ5GHHq%((}}.///r*   c                 \   t           j                            |                                           }t          j        |           |                     ddg          } || j        d|g|          }|                    ||                    t          j
                                         |S )Nimport os, syszsys.stdout.write(os.getcwd())r!   )r   )r   r   r   r   makedirsr   r$   r%   encodesysgetfilesystemencoding)r   utilFunccheckdirr   r(   s         r   	_pathTestzProcessUtilsTests._pathTest   s    goodkkmm,,
C((>?
 

 HTXj1<<<	eSZZ(A(C(CDDEEEr*   c                 L    |                      t          j        | j                  S )z
        L{getProcessOutput} runs the given command with the working directory
        given by the C{path} parameter.
        )rZ   r   r#   r&   r   s    r   test_getProcessOutputPathz+ProcessUtilsTests.test_getProcessOutputPath   s    
 ~~e4d6FGGGr*   c                 N      fd}                      t          j        |          S )z~
        L{getProcessValue} runs the given command with the working directory
        given by the C{path} parameter.
        c                 4                         | d           d S Nr   r@   resultignoredr   s     r   rX   z9ProcessUtilsTests.test_getProcessValuePath.<locals>.check       VQ'''''r*   )rZ   r   r<   r   rX   s   ` r   test_getProcessValuePathz*ProcessUtilsTests.test_getProcessValuePath   s5    	( 	( 	( 	( 	( ~~e3U;;;r*   c                 N      fd}                      t          j        |          S )z
        L{getProcessOutputAndValue} runs the given command with the working
        directory given by the C{path} parameter.
        c                 l    | \  }}}                     ||                                |d           d S r`   r@   out_err_statusrY   rB   r1   statusr   s        r   rX   zBProcessUtilsTests.test_getProcessOutputAndValuePath.<locals>.check   A    -CfS#&&&VQ'''''r*   )rZ   r   rF   re   s   ` r   !test_getProcessOutputAndValuePathz3ProcessUtilsTests.test_getProcessOutputAndValuePath   s5    	( 	( 	( 	( 	(
 ~~e<eDDDr*   c                    t           j                            |                                           }t          j        |           |                     g d          }|                     t           j        t          j                               t          j        |           t          j
        t          j	        d          j                  }t          j        |t          j        t          j        z             |                     t           j        ||            || j        d|g          }|                    ||                    t%          j                                         |S )N)rR   zcdir = os.getcwd()zsys.stdout.write(cdir).r!   )r   r   r   r   rS   r   
addCleanupchdirgetcwdstatS_IMODEst_modechmodS_IXUSRS_IRUSRr$   r%   rT   rU   rV   )r   rW   rX   rY   r   originalModer(   s          r   _defaultPathTestz"ProcessUtilsTests._defaultPathTest   s   goodkkmm,,
C((NNN
 


 	")++...
 |BGCLL$899
 	dlT\1222
 	#|444HTXj122	eSZZ(A(C(CDDEEEr*   c                 L    |                      t          j        | j                  S )a  
        If no value is supplied for the C{path} parameter, L{getProcessOutput}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        )rz   r   r#   r&   r\   s    r    test_getProcessOutputDefaultPathz2ProcessUtilsTests.test_getProcessOutputDefaultPath   s      $$U%;T=MNNNr*   c                 N      fd}                      t          j        |          S )a   
        If no value is supplied for the C{path} parameter, L{getProcessValue}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        c                 4                         | d           d S r`   r@   ra   s     r   rX   z@ProcessUtilsTests.test_getProcessValueDefaultPath.<locals>.check  rd   r*   )rz   r   r<   re   s   ` r   test_getProcessValueDefaultPathz1ProcessUtilsTests.test_getProcessValueDefaultPath   s7    	( 	( 	( 	( 	( $$U%:EBBBr*   c                 N      fd}                      t          j        |          S )a	  
        If no value is supplied for the C{path} parameter,
        L{getProcessOutputAndValue} runs the given command in the same working
        directory as the parent process and succeeds even if the current
        working directory is not accessible.
        c                 l    | \  }}}                     ||                                |d           d S r`   r@   ri   s        r   rX   zIProcessUtilsTests.test_getProcessOutputAndValueDefaultPath.<locals>.check  rl   r*   )rz   r   rF   re   s   ` r   (test_getProcessOutputAndValueDefaultPathz:ProcessUtilsTests.test_getProcessOutputAndValueDefaultPath  s7    	( 	( 	( 	( 	(
 $$U%CUKKKr*   c                                            ddg          }dt          j         j        d|g          } fd}|                    |           |S )z
        Standard input can be made available to the child process by passing
        bytes for the `stdinBytes` parameter.
        r    z"sys.stdout.write(sys.stdin.read())s   These are the bytes to see.r!   )
stdinBytesc                 l    | \  }}}                     |                               d|           d S r`   )assertInr&   )rA   rB   r1   rC   r   r   s       r   rD   zPProcessUtilsTests.test_get_processOutputAndValueStdin.<locals>.gotOutputAndValue(  s@    )NCd MM*c***Q%%%%%r*   rE   )r   r   r(   rD   r   s   `   @r   #test_get_processOutputAndValueStdinz5ProcessUtilsTests.test_get_processOutputAndValueStdin  s    
 ((4
 

 4
*H:!
 
 
	& 	& 	& 	& 	& 	& 	
'(((r*   ) __name__
__module____qualname____doc__r   IReactorProcessr   skipoutputvaluerU   
executabler$   r   r)   r4   r9   r=   rH   r   r	   	isWindowsrP   rZ   r]   rf   rm   rz   r|   r   r   r    r*   r   r   r      st        
 "z!'4008:FE
.C' ' 'A A A"  &: : :*2 2 20 0 02 VH  "FGG0 0 HG0>  H H H	< 	< 	<E E E  <O O OC C CL L L    r*   r   c                       e Zd ZdZd ZdS )SuppressWarningsTestsz.
    Tests for L{utils.suppressWarnings}.
    c                    g fd}|                      t          d|           d }t          j        |dt	          d          f          } |d           |                     t                    d            |d           |                     t                    d            |d	           |                     t                    d
           dS )zs
        L{utils.suppressWarnings} decorates a function so that the given
        warnings are suppressed.
        c                 6                         ||f           d S r-   )append)r   akwrb   s      r   showwarningz@SuppressWarningsTests.test_suppressWarnings.<locals>.showwarning@  s    MM1b'"""""r*   r   c                 .    t          j        |            d S r-   )warningswarn)msgs    r   fz6SuppressWarningsTests.test_suppressWarnings.<locals>.fE  s    M#r*   )ignorezThis is messagemessagezSanity check messager;   zUnignored message   N)patchr   r   suppressWarningsdictr&   len)r   r   r   grb   s       @r   test_suppressWarningsz+SuppressWarningsTests.test_suppressWarnings9  s   
 	# 	# 	# 	# 	# 	

8]K888	 	 	 "1{DAR4S4S4S&TUU 	

 !!!Va((( 	

Va((( 	

Va(((((r*   N)r   r   r   r   r   r   r*   r   r   r   4  s-         ) ) ) ) )r*   r   c                   >    e Zd ZdZ eej                  Zd Zd ZdS )DeferredSuppressedWarningsTestsz`
    Tests for L{utils.runWithWarningsSuppressed}, the version that supports
    Deferreds.
    c                 F   di fdi fg}t                      |                     |fd           t          j        d                               d           t          j        d           |                     dgd |                                 D                        dS )	z
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires.
        r   z.*foo.*r   z.*bar.*c                       S r-   r   rb   s   r   <lambda>zGDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<lambda>l  s     r*   
ignore foo   ignore foo 2c                     g | ]
}|d          S r   r   .0ws     r   
<listcomp>zIDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<listcomp>p      +W+W+WQAiL+W+W+Wr*   N)r   runWithWarningsSuppressedr   r   callbackr&   flushWarnings)r   filtersrb   s     @r   test_deferredCallbackz5DeferredSuppressedWarningsTests.test_deferredCallbackd  s     *2.1F0KL&&w???l###n%%%.)+W+W$BTBTBVBV+W+W+WXXXXXr*   c                    di fdi fg}t                      |                     |fd          }t          j        d                               t                                 |                    d            t          j        d           |                     dgd |                                 D                        dS )	z
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires with an errback.
        r   r   c                       S r-   r   r   s   r   r   zFDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>z  s    F r*   r   c                 6    |                      t                    S r-   )trapZeroDivisionError)r   s    r   r   zFDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>}  s    qvv&788 r*   r   c                     g | ]
}|d          S r   r   r   s     r   r   zHDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<listcomp>  r   r*   N)	r   r   r   r   errbackr   
addErrbackr&   r   )r   r   r(   rb   s      @r   test_deferredErrbackz4DeferredSuppressedWarningsTests.test_deferredErrbackr  s     *2.1F0KL**7NNNNCCl###(**+++	88999n%%%.)+W+W$BTBTBVBV+W+W+WXXXXXr*   N)	r   r   r   r   staticmethodr   r   r   r   r   r*   r   r   r   Z  sY          !-U-L M MY Y YY Y Y Y Yr*   r   )r   r   rK   rs   rU   r   unittestr   twisted.internetr   r   r   r   twisted.internet.deferr   twisted.python.runtimer	   twisted.python.test.test_utilr
   twisted.trial.unittestr   r   r   r   r   r   r*   r   <module>r      s`   
 
			   



        > > > > > > > > > > > > + + + + + + + + + + + + A A A A A A @ @ @ @ @ @ @ @Z Z Z Z Z Z Z Zz#) #) #) #) #)/ #) #) #)L%Y %Y %Y %Y %Y&= %Y %Y %Y %Y %Yr*   