triotp package

Module contents

TriOTP is built on top of the Trio async library. Therefore, it is not directly compatible with AsyncIO libraries.

This library revolves around the folllwing concepts:

  • a Node represent a single asynchronous loop (trio.run)

  • an application represent the root of a supervision tree

  • a supervisor handles automatic restart of child processes

  • a mailbox enables message passing between asynchronous tasks

On top of this concepts, this library provides:

  • generic servers to handle requests from other tasks

  • dynamic supervisors to schedule new tasks

NB: You don’t get distributed computing out of the box like you would with Erlang/Elixir, this library is single-threaded and works within a Python application only.

Submodules

triotp.node module

A TriOTP node encapsulates the call to trio.run and allows you to specify a list of application to start.

NB: There is no dependency management between applications, it’s up to you to start the correct applications in the right order.

Example
from pyotp import node, application

from myproject import myapp1, myapp2

node.run(
    apps=[
        application.app_spec(
            module=myapp1,
            start_arg=[],
        ),
        application.app_spec(
            module=myapp2,
            start_arg=[],
            permanent=False,
        ),
    ],
)
triotp.node.run(apps: list[app_spec], loglevel: LogLevel = LogLevel.NONE, logformat: str | None = None) None

Start a new node by calling trio.run.

Parameters:
  • apps – List of application to start

  • loglevel – Logging Level of the node

  • logformat – Format of log messages produced by the node

triotp.application module

An application is a Python module defining an asynchronous function start.

Example
async def start(_start_arg):
    print('Hello world')

Usually, the application will start a supervisor containing the child tasks to run.

class triotp.application.app_spec(module: module, start_arg: Any, permanent: bool = True, opts: options | None = None)

Bases: object

Describe an application

module: module

Application module

opts: options | None = None
permanent: bool = True
start_arg: Any

Argument to pass to the module’s start function

async triotp.application.start(app: app_spec) None

Starts an application on the current node. If the application is already started, it does nothing.

NB: This function cannot be called outside a node.

Parameters:

app – The application to start

async triotp.application.stop(app_name: str) None

Stops an application. If the application was not running, it does nothing.

NB: This function cannot be called outside a node.

Parameters:

app_name__name__ of the application module

triotp.supervisor module

A supervisor is used to handle a set of asynchronous tasks. It takes care of restarting them if they exit prematurely or if they crash.

Example
from triotp import supervisor
from random import random
import trio

async def loop(threshold):
    while True:
        if random() < threshold:
            raise RuntimeError('bad luck')

        else:
            await trio.sleep(0.1)

async def start_supervisor():
    children = [
        supervisor.child_spec(
            id='loop',
            task=loop,
            args=[0.5],
            restart=supervisor.restart_strategy.PERMANENT,
        ),
    ]
    opts = supervisor.options(
        max_restarts=3,
        max_seconds=5
    )
    await supervisor.start(children, opts)
class triotp.supervisor.child_spec(id: str, task: Callable[[...], Awaitable[None]], args: list[Any], restart: restart_strategy = restart_strategy.PERMANENT)

Bases: object

Describe an asynchronous task to supervise.

args: list[Any]

Arguments to pass to the task

id: str

Task identifier

restart: restart_strategy = 1

When to restart the task

task: Callable[[...], Awaitable[None]]

The task to run

class triotp.supervisor.options(max_restarts: int = 3, max_seconds: int = 5)

Bases: object

Describe the options for the supervisor.

max_restarts: int = 3

Maximum number of restart during a limited timespan

max_seconds: int = 5

Timespan duration

class triotp.supervisor.restart_strategy(value)

Bases: Enum

Describe when to restart an asynchronous task.

PERMANENT = 1

Always restart the task

TEMPORARY = 3

Never restart a task

TRANSIENT = 2

Restart the task only if it raises an exception

async triotp.supervisor.start(child_specs: list[child_spec], opts: options, task_status=TASK_STATUS_IGNORED) None

Start the supervisor and its children.

Parameters:
  • child_specs – Asynchronous tasks to supervise

  • opts – Supervisor options

  • task_status – Used to notify the trio nursery that the task is ready

Example
from triotp import supervisor
import trio

async def example():
    children_a = [
        # ...
    ]
    children_b = [
        # ...
    ]
    opts = supervisor.options()

    async with trio.open_nursery() as nursery:
        await nursery.start(supervisor.start, children_a, opts)
        await nursery.start(supervisor.start, children_b, opts)

triotp.mailbox module

In Erlang/Elixir, each process have a PID that can be used to receive message from other processes.

With trio, there is no such thing as a process. There is only asynchronous tasks started within a nursery.

This module provides an encapsulation of trio’s memory channels which allows tasks to communicate with each other.

Example
from triotp import mailbox


async def task_a(task_status=trio.TASK_STATUS_IGNORED):
    async with mailbox.open(name='task_a') as mid:
        task_status.started(None)

        msg = await mailbox.receive(mid)
        print(msg)


async def task_b():
    await mailbox.send('task_a', 'hello world')


async def main():
    async with trio.open_nursery() as nursery:
        await nursery.start(task_a)
        nursery.start_soon(task_b)
exception triotp.mailbox.MailboxDoesNotExist(mid: MailboxID)

Bases: RuntimeError

Error thrown when the mailbox identifier was not found.

class triotp.mailbox.MailboxID

Mailbox identifier (UUID4)

alias of TypeVar(‘MailboxID’, bound=str)

exception triotp.mailbox.NameAlreadyExist(name: str)

Bases: RuntimeError

Error thrown when trying to register a mailbox to an already registered name.

exception triotp.mailbox.NameDoesNotExist(name: str)

Bases: RuntimeError

Error thrown when trying to unregister a non-existing name.

triotp.mailbox.create() MailboxID

Create a new mailbox.

Returns:

The mailbox unique identifier

async triotp.mailbox.destroy(mid: MailboxID) None

Close and destroy a mailbox.

Parameters:

mid – The mailbox identifier

Raises:

MailboxDoesNotExist – The mailbox identifier was not found

triotp.mailbox.open(name: str | None = None) AsyncContextManager[MailboxID]

Shortcut for create(), register() followed by a destroy().

Parameters:

name – Optional name to register the mailbox

Returns:

Asynchronous context manager for the mailbox

Raises:

NameAlreadyExist – If the name was already registered

Example
async with mailbox.open(name='foo') as mid:
    message = await mailbox.receive()
    print(message)
async triotp.mailbox.receive(mid: MailboxID, timeout: float | None = None, on_timeout: Callable[[], Awaitable[Any]] | None = None) Any

Consume a message from a mailbox.

Parameters:
  • mid – The mailbox identifier

  • timeout – If set, the call will fail after the timespan set in seconds

  • on_timeout – If set and timeout is set, instead of raising an exception, the result of this async function will be returned

Raises:
  • MailboxDoesNotExist – The mailbox was not found

  • trio.TooSlowError – If timeout is set, but on_timeout isn’t, and no message was received during the timespan set

triotp.mailbox.register(mid: MailboxID, name: str) None

Assign a name to a mailbox.

Parameters:
  • mid – The mailbox identifier

  • name – The new name

Raises:
async triotp.mailbox.send(name_or_mid: str | MailboxID, message: Any) None

Send a message to a mailbox.

Parameters:
  • name_or_mid – Either a registered name, or the mailbox identifier

  • message – The message to send

Raises:

MailboxDoesNotExist – The mailbox was not found

triotp.mailbox.unregister(name: str) None

Unregister a mailbox’s name.

Parameters:

name – The name to unregister

Raises:

NameDoesNotExist – The name was not found

triotp.mailbox.unregister_all(mid: MailboxID) None

Unregister all names associated to a mailbox.

Parameters:

mid – The mailbox identifier

triotp.gen_server module

A generic server is an abstraction of a server loop built on top of the mailbox module.

It is best used to build components that accept request from other components in your application such as:

  • an in-memory key-value store

  • a TCP server handler

  • a finite state machine

There are 3 ways of sending messages to a generic server:

  • cast: send a message

  • call: send a message an wait for a response

  • directly to the mailbox

> NB: If a call returns an exception to the caller, the exception will be > raised on the caller side.

Example
from triotp.helpers import current_module
from triotp import gen_server, mailbox


__module__ = current_module()


async def start():
    await gen_server.start(__module__, name='kvstore')


async def get(key):
    return await gen_server.call('kvstore', ('get', key))


async def set(key, val):
    return await gen_server.call('kvstore', ('set', key, val))


async def stop():
    await gen_server.cast('kvstore', 'stop')


async def printstate():
    await mailbox.send('kvstore', 'printstate')

# gen_server callbacks

async def init(_init_arg):
    state = {}
    return state


# optional
async def terminate(reason, state):
    if reason is not None:
        print('An error occured:', reason)

    print('Exited with state:', state)


# if not defined, the gen_server will stop with a NotImplementedError when
# receiving a call
async def handle_call(message, _caller, state):
    match message:
        case ('get', key):
            val = state.get(key, None)
            return (gen_server.Reply(payload=val), state)

        case ('set', key, val):
            prev = state.get(key, None)
            state[key] = val
            return (gen_server.Reply(payload=prev), state)

        case _:
            exc = NotImplementedError('unknown request')
            return (gen_server.Reply(payload=exc), state)


# if not defined, the gen_server will stop with a NotImplementedError when
# receiving a cast
async def handle_cast(message, state):
    match message:
        case 'stop':
            return (gen_server.Stop(), state)

        case _:
            print('unknown request')
            return (gen_server.NoReply(), state)


# optional
async def handle_info(message, state):
    match message:
        case 'printstate':
            print(state)

        case _:
            pass

    return (gen_server.NoReply(), state)
exception triotp.gen_server.GenServerExited

Bases: Exception

Raised when the generic server exited during a call.

class triotp.gen_server.NoReply

Bases: object

Return an instance of this class to not send a reply to the caller.

class triotp.gen_server.Reply(payload: Any)

Bases: object

Return an instance of this class to send a reply to the caller.

payload: Any

The response to send back

class triotp.gen_server.Stop(reason: BaseException | None = None)

Bases: object

Return an instance of this class to stop the generic server.

reason: BaseException | None = None
async triotp.gen_server.call(name_or_mid: str | MailboxID, payload: Any, timeout: float | None = None) Any

Send a request to the generic server and wait for a response.

This function creates a temporary bi-directional channel. The writer is passed to the handle_call function and is used to send the response back to the caller.

Parameters:
  • name_or_mid – The generic server’s mailbox identifier

  • payload – The message to send to the generic server

  • timeout – Optional timeout after which this function fails

Returns:

The response from the generic server

Raises:
  • GenServerExited – If the generic server exited after handling the call

  • Exception – If the response is an exception

async triotp.gen_server.cast(name_or_mid: str | MailboxID, payload: Any) None

Send a message to the generic server without expecting a response.

Parameters:
  • name_or_mid – The generic server’s mailbox identifier

  • payload – The message to send

async triotp.gen_server.reply(caller: MemorySendChannel, response: Any) None

The handle_call callback can start a background task to handle a slow request and return a NoReply instance. Use this function in the background task to send the response to the caller at a later time.

Parameters:
  • caller – The caller SendChannel to use to send the response

  • response – The response to send back to the caller

Example
from triotp import gen_server, supervisor, dynamic_supervisor
import trio


async def slow_task(message, caller):
    # do stuff with message
    await gen_server.reply(caller, response)


async def handle_call(message, caller, state):
    await dynamic_supervisor.start_child(
        'slow-task-pool',
        supervisor.child_spec(
            id='some-slow-task',
            task=slow_task,
            args=[message, caller],
            restart=supervisor.restart_strategy.TEMPORARY,
        ),
    )

    return (gen_server.NoReply(), state)
async triotp.gen_server.start(module: module, init_arg: Any | None = None, name: str | None = None) None

Starts the generic server loop.

Parameters:
  • module – Module containing the generic server’s callbacks

  • init_arg – Optional argument passed to the init callback

  • name – Optional name to use to register the generic server’s mailbox

Raises:

triotp.dynamic_supervisor module

A dynamic supervisor is almost identical to a normal supervisor.

The only difference is that a dynamic supervisor creates a mailbox in order to receive requests to start new children from other tasks.

Example
# app.py

from triotp import supervisor, dynamic_supervisor
import trio

from . import worker


async def start():
    opts = supervisor.options()
    children = [
        supervisor.child_spec(
            id='worker_pool',
            task=dynamic_supervisor.start,
            args=[opts, 'worker-pool'],
        ),
    ]

    async with trio.open_nursery() as nursery:
        await nursery.start_soon(supervisor.start, children, opts)

        await dynamic_supervisor.start_child(
            'worker-pool',
            supervisor.child_spec(
                id='worker-0',
                task=worker.start,
                args=[],
                restart=supervisor.restart_strategy.TRANSIENT,
            ),
        )
async triotp.dynamic_supervisor.start(opts: options, name: str | None = None, task_status=TASK_STATUS_IGNORED) None

Starts a new dynamic supervisor.

This function creates a new mailbox to receive request for new children.

Parameters:
  • opts – Supervisor options

  • name – Optional name to use to register the supervisor’s mailbox

  • task_status – Used to notify the trio nursery that the supervisor is ready

Raises:

triotp.mailbox.NameAlreadyExist – If the name was already registered

Example
from triotp import dynamic_supervisor, supervisor
import trio


async def example():
    opts = supervisor.options()
    child_spec = # ...

    async with trio.open_nursery() as nursery:
        mid = await nursery.start(dynamic_supervisor.start, opts)
        await dynamic_supervisor.start_child(mid, child_spec)
async triotp.dynamic_supervisor.start_child(name_or_mid: str | MailboxID, child_spec: child_spec) None

Start a new task in the specified supervisor.

Parameters:
  • name_or_mid – Dynamic supervisor’s mailbox identifier

  • child_spec – Child specification to start

triotp.logging module

TriOTP logging system relies on the Logbook library. Each node has its own log handler.

class triotp.logging.LogLevel(value)

Bases: Enum

TriOTP node’s logging level

CRITICAL = 6
DEBUG = 2
ERROR = 5
INFO = 3
NONE = 1

Logging is disabled

WARNING = 4
to_logbook() int

Convert this enum to a Logbook log level.

Returns:

Logbook log level

triotp.logging.getLogger(name: str) Logger

Get a logger by name.

Parameters:

name – Name of the logger

Returns:

Logbook Logger instance

triotp.helpers module

triotp.helpers.current_module() module

This function should be called at the root of a module.

Returns:

The current module (similar to __name__ for the current module name)

Example
from triotp.helpers import current_module

__module__ = current_module()  # THIS WORKS


def get_module():
    return current_module()  # THIS WON'T WORK