Using subprocesses

AnyIO allows you to run arbitrary executables in subprocesses, either as a one-shot call or by opening a process handle for you that gives you more control over the subprocess.

You can either give the command as a string, in which case it is passed to your default shell (equivalent to shell=True in subprocess.run()), or as a sequence of strings (shell=False) in which case the executable is the first item in the sequence and the rest are arguments passed to it.

Running one-shot commands

To run an external command with one call, use run_process():

from anyio import run_process, run


async def main():
    result = await run_process('ps')
    print(result.stdout.decode())

run(main)

The snippet above runs the ps command within a shell. To run it directly:

from anyio import run_process, run


async def main():
    result = await run_process(['ps'])
    print(result.stdout.decode())

run(main)

Working with processes

When you have more complex requirements for your interaction with subprocesses, you can launch one with open_process():

from anyio import open_process, run
from anyio.streams.text import TextReceiveStream


async def main():
    async with await open_process(['ps']) as process:
        async for text in TextReceiveStream(process.stdout):
            print(text)

run(main)

See the API documentation of Process for more information.

Running functions in worker processes

When you need to run CPU intensive code, worker processes are better than threads because current implementations of Python cannot run Python code in multiple threads at once.

Exceptions to this rule are:

  1. Blocking I/O operations

  2. C extension code that explicitly releases the Global Interpreter Lock

If the code you wish to run does not belong in this category, it’s best to use worker processes instead in order to take advantage of multiple CPU cores. This is done by using to_process.run_sync():

import time

from anyio import run, to_process


def cpu_intensive_function(arg1, arg2):
    time.sleep(1)
    return arg1 + arg2

async def main():
    result = await to_process.run_sync(cpu_intensive_function, 'Hello, ', 'world!')
    print(result)

# This check is important when the application uses run_sync_in_process()
if __name__ == '__main__':
    run(main)

Technical details

There are some limitations regarding the arguments and return values passed:

  • the arguments must be pickleable (using the highest available protocol)

  • the return value must be pickleable (using the highest available protocol)

  • the target callable must be importable (lambdas and inner functions won’t work)

Other considerations:

  • Even cancellable=False runs can be cancelled before the request has been sent to the worker process

  • If a cancellable call is cancelled during execution on the worker process, the worker process will be killed

  • The worker process imports the parent’s __main__ module, so guarding for any import time side effects using if __name__ == '__main__': is required to avoid infinite recursion

  • sys.stdin and sys.stdout, sys.stderr are redirected to /dev/null so print() and input() won’t work

  • Worker processes terminate after 5 minutes of inactivity, or when the event loop is finished

  • Multiprocessing-style synchronization primitives are currently not available