o
    L)j;                  !   @   s|  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlZd	d
lmZ d	dlmZmZmZm Z  erpd dlm!Z! dZ"e#dZ$dddde d ddddd	ddddee%B de%e
def B de&edf de'e%ef dB ddde
e(e gdf dB de
ee%ge)f dB d e*d!e+d"e+d#e)dB d$e+d%e+d&e)d'e)d(e+f d)d*Z,dddde d dddddd+dee%B de%e
def B de&edf de'e%ef dB ddde
e(e gef dB de
ee%ge)f dB d e*d!e+d"e+d#e)dB d&e)d'e)d(e+fd,d-Z-ed.Z.d/e%d(e/e% fd0d1Z0	dLde%e
def B dd2de&edf de'e%ef dB d3e(e dB d(d4fd5d6Z1de%e
def B d(d2fd7d8Z2G d9d4 d4Z3d:e%d;e%dB de&edf de'e%ef d(df
d<d=Z4d>e%d(efd?d@Z5d(e%dB fdAdBZ6e j7d;e%dB d(edC fdDdEZ8dFe+dGed(dfdHdIZ9dMdJdKZ:dS )N    N)Callable	Generator)import_module)get_context)SpawnProcess)Path)sleep)TYPE_CHECKINGAny   )DefaultFilter)Change
FileChangeawatchwatch)Literal)run_processarun_processdetect_target_typeimport_stringzwatchfiles.main autoi@  2      TF)argskwargstarget_typecallbackwatch_filtergrace_perioddebouncestepdebugsigint_timeoutsigkill_timeout	recursiveignore_permission_deniedpathstarget.r   r   r   z&Literal['function', 'command', 'auto']r   r   r   r    r!   r"   r#   r$   r%   r&   returnc              
   G   s   |dkrt | }td| | t  t| |||}d}|r'td| t| z0t|||||	d||dD ]}|o<|| |j|
|d t| ||||}|d7 }q5W |  |S |  w )	u  
    Run a process and restart it upon file changes.

    `run_process` can work in two ways:

    * Using `multiprocessing.Process` † to run a python function
    * Or, using `subprocess.Popen` to run a command

    !!! note

        **†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve
        code reload/import.

    Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function
    exits cleanly upon `Ctrl+C`.

    Args:
        *paths: matches the same argument of [`watch`][watchfiles.watch]
        target: function or command to run
        args: arguments to pass to `target`, only used if `target` is a function
        kwargs: keyword arguments to pass to `target`, only used if `target` is a function
        target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case
            [`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type.
        callback: function to call on each reload, the function should accept a set of changes as the sole argument
        watch_filter: matches the same argument of [`watch`][watchfiles.watch]
        grace_period: number of seconds after the process is started before watching for changes
        debounce: matches the same argument of [`watch`][watchfiles.watch]
        step: matches the same argument of [`watch`][watchfiles.watch]
        debug: matches the same argument of [`watch`][watchfiles.watch]
        sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill
        sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception
        recursive: matches the same argument of [`watch`][watchfiles.watch]

    Returns:
        number of times the function was reloaded.

    ```py title="Example of run_process running a function"
    from watchfiles import run_process

    def callback(changes):
        print('changes detected:', changes)

    def foobar(a, b):
        print('foobar called with:', a, b)

    if __name__ == '__main__':
        run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback)
    ```

    As well as using a `callback` function, changes can be accessed from within the target function,
    using the `WATCHFILES_CHANGES` environment variable.

    ```py title="Example of run_process accessing changes"
    from watchfiles import run_process

    def foobar(a, b, c):
        # changes will be an empty list "[]" the first time the function is called
        changes = os.getenv('WATCHFILES_CHANGES')
        changes = json.loads(changes)
        print('foobar called due to changes:', changes)

    if __name__ == '__main__':
        run_process('./path/to/dir', target=foobar, args=(1, 2, 3))
    ```

    Again with the target as `command`, `WATCHFILES_CHANGES` can be used
    to access changes.

    ```bash title="example.sh"
    echo "changers: ${WATCHFILES_CHANGES}"
    ```

    ```py title="Example of run_process running a command"
    from watchfiles import run_process

    if __name__ == '__main__':
        run_process('.', target='./example.sh')
    ```
    r   running "%s" as %sr   3sleeping for %s seconds before watching for changesF)r   r    r!   r"   raise_interruptr%   r&   )r#   r$   r   )r   loggerr"   catch_sigtermstart_processr   r   stop)r(   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   processreloadschangesr   r   s/var/www/html/finance-dev.cargoinsureonline.com/_shared/backend-venv/lib/python3.10/site-packages/watchfiles/run.pyr      s:   `



r   )r   r   r   r   r   r   r    r!   r"   r%   r&   c              	      s  ddl }|dkrt| }td| | t  tjt| |||I dH }d}|r6td| t	|I dH  t
|||||	|
|d2 z43 dH W }|durZ||}||rZ|I dH  tj|jI dH  tjt| ||||I dH }|d7 }qB6 tj|jI dH  |S )a  
    Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except
    `callback` which can be a coroutine.

    Starting and stopping the process and watching for changes is done in a separate thread.

    As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt`
    cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below.

    ```py title="Example of arun_process usage"
    import asyncio
    from watchfiles import arun_process

    async def callback(changes):
        await asyncio.sleep(0.1)
        print('changes detected:', changes)

    def foobar(a, b):
        print('foobar called with:', a, b)

    async def main():
        await arun_process('.', target=foobar, args=(1, 2), callback=callback)

    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('stopped via KeyboardInterrupt')
    ```
    r   Nr   r*   r+   )r   r    r!   r"   r%   r&   r   )inspectr   r-   r"   r.   anyio	to_threadrun_syncr/   r   r   isawaitabler0   )r(   r   r   r   r   r   r   r    r!   r"   r%   r&   r'   r5   r1   r2   r3   rr   r   r4   r      s<   -	


r   spawncmdc                 C   s(   dd l }| j dk}tj| |dS )Nr   windows)posix)platformunamesystemlowershlexsplit)r<   r?   r>   r   r   r4   	split_cmd   s   rE   zLiteral['function', 'command']r3   CombinedProcessc           	      C   s   |d u rd}n
t dd |D }|tjd< |dkrA|pi }t| tr/| t ||f}t}i }n| }tj	|||d}|
  t|S |sE|rJtd t| tsSJ dt| }t|}t|S )	Nz[]c                 S   s   g | ]
\}}|  |gqS r   )raw_str).0cpr   r   r4   
<listcomp>  s    z!start_process.<locals>.<listcomp>WATCHFILES_CHANGESfunction)r(   r   r   z-ignoring args and kwargs for "command" targetz+target must be a string to run as a command)jsondumpsosenviron
isinstancestrget_tty_pathrun_functionspawn_contextProcessstartr-   warningrE   
subprocessPopenrF   )	r(   r   r   r   r3   changes_env_vartarget_r1   
popen_argsr   r   r4   r/      s(   



r/   c                 C   s0   t | tsdS | drdS td| rdS dS )a^  
    Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process]
    and indirectly the CLI to determine the target type with `target_type` is `auto`.

    Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`.

    The following logic is employed:

    * If `target` is not a string, it is assumed to be a function
    * If `target` ends with `.py` or `.sh`, it is assumed to be a command
    * Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+`

    If this logic does not work for you, specify the target type explicitly using the `target_type` function argument
    or `--target-type` command line argument.

    Args:
        target: The target value

    Returns:
        either `'function'` or `'command'`
    rM   )z.pyz.shcommandz[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+)rR   rS   endswithre	fullmatch)r(   r   r   r4   r     s   

r   c                   @   sv   e Zd ZdddZddeded	d
fddZd	efddZed	efddZ	ded	d
fddZ
ed	ed
B fddZd
S )rF   rJ   &SpawnProcess | subprocess.Popen[bytes]c                 C   s   || _ | jd usJ dd S )Nzprocess not yet spawned_ppid)selfrJ   r   r   r4   __init__?  s   zCombinedProcess.__init__r   r   r#   r$   r)   Nc                 C   s   t jdd  |  rPtd t | jtj	 z| 
| W n tjy/   td| Y nw | jd u rItd t | jtj | 
| d S td d S td| j d S )NrL   zstopping process...z!SIGINT timed out after %r secondsz+process has not terminated, sending SIGKILLzprocess stoppedz#process already dead, exit code: %d)rP   rQ   popis_aliver-   r"   killrf   signalSIGINTjoinrZ   TimeoutExpiredrY   exitcodeSIGKILL)rg   r#   r$   r   r   r4   r0   C  s    


zCombinedProcess.stopc                 C   s$   t | jtr| j S | j d u S N)rR   re   r   rj   pollrg   r   r   r4   rj   [  s   
zCombinedProcess.is_alivec                 C   s   | j jS rr   rd   rt   r   r   r4   rf   a  s   zCombinedProcess.pidtimeoutc                 C   s,   t | jtr| j| d S | j| d S rr   )rR   re   r   rn   wait)rg   ru   r   r   r4   rn   f  s   zCombinedProcess.joinc                 C   s   t | jtr
| jjS | jjS rr   )rR   re   r   rp   
returncodert   r   r   r4   rp   l  s   zCombinedProcess.exitcode)rJ   rc   )r   r   )__name__
__module____qualname__rh   intr0   boolrj   propertyrf   rn   rp   r   r   r   r4   rF   >  s    
rM   tty_pathc                 C   sD   t | t| }||i | W d    d S 1 sw   Y  d S rr   )set_ttyr   )rM   r~   r   r   funcr   r   r4   rU   t  s   
"rU   dotted_pathc              
   C   s   z|  ddd\}}W n ty" } z	td|  d|d}~ww t|}zt||W S  tyE } ztd| d| d	|d}~ww )
z
    Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
    last name in the path. Raise ImportError if the import fails.
     .r   "z!" doesn't look like a module pathNzModule "z" does not define a "z" attribute)striprsplit
ValueErrorImportErrorr   getattrAttributeError)r   module_path
class_nameemoduler   r   r4   r   z  s   r   c                   C   s:   z	t tj W S  ty   Y dS  ty   Y dS w )zr
    Return the path to the current TTY, if any.

    Virtually impossible to test in pytest, hence no cover.
    z/dev/ttyN)rP   ttynamesysstdinfilenoOSErrorr   r   r   r   r4   rT     s   rT   )NNNc                 c   sj    | r0zt | }|t_d V  W d    W d S 1 sw   Y  W d S  ty/   d V  Y d S w d V  d S rr   )openr   r   r   )r~   ttyr   r   r4   r     s   
&
r   signum_framec                 C   s   t dt|  t)Nz-received signal %s, raising KeyboardInterrupt)r-   rY   rl   SignalsKeyboardInterrupt)r   r   r   r   r4   raise_keyboard_interrupt  s   r   c                   C   s"   t dt  ttjt dS )a  
    Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly
    on `docker compose stop` and other cases where SIGTERM is sent.

    Without this the watchfiles process will be killed while a running process will continue uninterrupted.
    z8registering handler for SIGTERM on watchfiles process %dN)r-   r"   rP   getpidrl   SIGTERMr   r   r   r   r4   r.     s   r.   rr   )r)   N);
contextlibrN   loggingrP   ra   rC   rl   rZ   r   collections.abcr   r   	importlibr   multiprocessingr   multiprocessing.contextr   pathlibr   timer   typingr	   r
   r6   filtersr   mainr   r   r   r   r   __all__	getLoggerr-   rS   tupledictsetr|   floatr{   r   r   rV   listrE   r/   r   rF   rU   r   rT   contextmanagerr   r   r.   r   r   r   r4   <module>   s   

	

 
	

R


$ 26