triotp package
Module contents
TriOTP is built on top of the Trio async library. Therefore, it is not directly compatible with AsyncIO libraries.
This library revolves around the folllwing concepts:
a Node represent a single asynchronous loop (trio.run)
an application represent the root of a supervision tree
a supervisor handles automatic restart of child processes
a mailbox enables message passing between asynchronous tasks
On top of this concepts, this library provides:
generic servers to handle requests from other tasks
dynamic supervisors to schedule new tasks
Submodules
triotp.node module
A TriOTP node encapsulates the call to trio.run and allows you to specify a list of application to start.
NB: There is no dependency management between applications, it’s up to you to start the correct applications in the right order.
from pyotp import node, application
from myproject import myapp1, myapp2
node.run(
apps=[
application.app_spec(
module=myapp1,
start_arg=[],
),
application.app_spec(
module=myapp2,
start_arg=[],
permanent=False,
),
],
)
triotp.application module
An application is a Python module defining an asynchronous function start.
async def start(_start_arg):
print('Hello world')
Usually, the application will start a supervisor containing the child tasks to run.
- class triotp.application.app_spec(module: module, start_arg: Any, permanent: bool = True, opts: options | None = None)
Bases:
object
Describe an application
- module: module
Application module
- permanent: bool = True
- start_arg: Any
Argument to pass to the module’s start function
- async triotp.application.start(app: app_spec) None
Starts an application on the current node. If the application is already started, it does nothing.
NB: This function cannot be called outside a node.
- Parameters:
app – The application to start
- async triotp.application.stop(app_name: str) None
Stops an application. If the application was not running, it does nothing.
NB: This function cannot be called outside a node.
- Parameters:
app_name – __name__ of the application module
triotp.supervisor module
A supervisor is used to handle a set of asynchronous tasks. It takes care of restarting them if they exit prematurely or if they crash.
from triotp import supervisor
from random import random
import trio
async def loop(threshold):
while True:
if random() < threshold:
raise RuntimeError('bad luck')
else:
await trio.sleep(0.1)
async def start_supervisor():
children = [
supervisor.child_spec(
id='loop',
task=loop,
args=[0.5],
restart=supervisor.restart_strategy.PERMANENT,
),
]
opts = supervisor.options(
max_restarts=3,
max_seconds=5
)
await supervisor.start(children, opts)
- class triotp.supervisor.child_spec(id: str, task: Callable[[...], Awaitable[None]], args: list[Any], restart: restart_strategy = restart_strategy.PERMANENT)
Bases:
object
Describe an asynchronous task to supervise.
- args: list[Any]
Arguments to pass to the task
- id: str
Task identifier
- restart: restart_strategy = 1
When to restart the task
- task: Callable[[...], Awaitable[None]]
The task to run
- class triotp.supervisor.options(max_restarts: int = 3, max_seconds: int = 5)
Bases:
object
Describe the options for the supervisor.
- max_restarts: int = 3
Maximum number of restart during a limited timespan
- max_seconds: int = 5
Timespan duration
- class triotp.supervisor.restart_strategy(value)
Bases:
Enum
Describe when to restart an asynchronous task.
- PERMANENT = 1
Always restart the task
- TEMPORARY = 3
Never restart a task
- TRANSIENT = 2
Restart the task only if it raises an exception
- async triotp.supervisor.start(child_specs: list[child_spec], opts: options, task_status=TASK_STATUS_IGNORED) None
Start the supervisor and its children.
- Parameters:
child_specs – Asynchronous tasks to supervise
opts – Supervisor options
task_status – Used to notify the trio nursery that the task is ready
from triotp import supervisor import trio async def example(): children_a = [ # ... ] children_b = [ # ... ] opts = supervisor.options() async with trio.open_nursery() as nursery: await nursery.start(supervisor.start, children_a, opts) await nursery.start(supervisor.start, children_b, opts)
triotp.mailbox module
In Erlang/Elixir, each process have a PID that can be used to receive message from other processes.
With trio, there is no such thing as a process. There is only asynchronous tasks started within a nursery.
This module provides an encapsulation of trio’s memory channels which allows tasks to communicate with each other.
from triotp import mailbox
async def task_a(task_status=trio.TASK_STATUS_IGNORED):
async with mailbox.open(name='task_a') as mid:
task_status.started(None)
msg = await mailbox.receive(mid)
print(msg)
async def task_b():
await mailbox.send('task_a', 'hello world')
async def main():
async with trio.open_nursery() as nursery:
await nursery.start(task_a)
nursery.start_soon(task_b)
- exception triotp.mailbox.MailboxDoesNotExist(mid: MailboxID)
Bases:
RuntimeError
Error thrown when the mailbox identifier was not found.
- class triotp.mailbox.MailboxID
Mailbox identifier (UUID4)
alias of TypeVar(‘MailboxID’, bound=
str
)
- exception triotp.mailbox.NameAlreadyExist(name: str)
Bases:
RuntimeError
Error thrown when trying to register a mailbox to an already registered name.
- exception triotp.mailbox.NameDoesNotExist(name: str)
Bases:
RuntimeError
Error thrown when trying to unregister a non-existing name.
- async triotp.mailbox.destroy(mid: MailboxID) None
Close and destroy a mailbox.
- Parameters:
mid – The mailbox identifier
- Raises:
MailboxDoesNotExist – The mailbox identifier was not found
- triotp.mailbox.open(name: str | None = None) AsyncContextManager[MailboxID]
Shortcut for create(), register() followed by a destroy().
- Parameters:
name – Optional name to register the mailbox
- Returns:
Asynchronous context manager for the mailbox
- Raises:
NameAlreadyExist – If the name was already registered
async with mailbox.open(name='foo') as mid: message = await mailbox.receive() print(message)
- async triotp.mailbox.receive(mid: MailboxID, timeout: float | None = None, on_timeout: Callable[[], Awaitable[Any]] | None = None) Any
Consume a message from a mailbox.
- Parameters:
mid – The mailbox identifier
timeout – If set, the call will fail after the timespan set in seconds
on_timeout – If set and timeout is set, instead of raising an exception, the result of this async function will be returned
- Raises:
MailboxDoesNotExist – The mailbox was not found
trio.TooSlowError – If timeout is set, but on_timeout isn’t, and no message was received during the timespan set
- triotp.mailbox.register(mid: MailboxID, name: str) None
Assign a name to a mailbox.
- Parameters:
mid – The mailbox identifier
name – The new name
- Raises:
MailboxDoesNotExist – The mailbox identifier was not found
NameAlreadyExist – The name was already registered
- async triotp.mailbox.send(name_or_mid: str | MailboxID, message: Any) None
Send a message to a mailbox.
- Parameters:
name_or_mid – Either a registered name, or the mailbox identifier
message – The message to send
- Raises:
MailboxDoesNotExist – The mailbox was not found
- triotp.mailbox.unregister(name: str) None
Unregister a mailbox’s name.
- Parameters:
name – The name to unregister
- Raises:
NameDoesNotExist – The name was not found
triotp.gen_server module
A generic server is an abstraction of a server loop built on top of the mailbox module.
It is best used to build components that accept request from other components in your application such as:
an in-memory key-value store
a TCP server handler
a finite state machine
There are 3 ways of sending messages to a generic server:
cast: send a message
call: send a message an wait for a response
directly to the mailbox
> NB: If a call returns an exception to the caller, the exception will be > raised on the caller side.
from triotp.helpers import current_module
from triotp import gen_server, mailbox
__module__ = current_module()
async def start():
await gen_server.start(__module__, name='kvstore')
async def get(key):
return await gen_server.call('kvstore', ('get', key))
async def set(key, val):
return await gen_server.call('kvstore', ('set', key, val))
async def stop():
await gen_server.cast('kvstore', 'stop')
async def printstate():
await mailbox.send('kvstore', 'printstate')
# gen_server callbacks
async def init(_init_arg):
state = {}
return state
# optional
async def terminate(reason, state):
if reason is not None:
print('An error occured:', reason)
print('Exited with state:', state)
# if not defined, the gen_server will stop with a NotImplementedError when
# receiving a call
async def handle_call(message, _caller, state):
match message:
case ('get', key):
val = state.get(key, None)
return (gen_server.Reply(payload=val), state)
case ('set', key, val):
prev = state.get(key, None)
state[key] = val
return (gen_server.Reply(payload=prev), state)
case _:
exc = NotImplementedError('unknown request')
return (gen_server.Reply(payload=exc), state)
# if not defined, the gen_server will stop with a NotImplementedError when
# receiving a cast
async def handle_cast(message, state):
match message:
case 'stop':
return (gen_server.Stop(), state)
case _:
print('unknown request')
return (gen_server.NoReply(), state)
# optional
async def handle_info(message, state):
match message:
case 'printstate':
print(state)
case _:
pass
return (gen_server.NoReply(), state)
- exception triotp.gen_server.GenServerExited
Bases:
Exception
Raised when the generic server exited during a call.
- class triotp.gen_server.NoReply
Bases:
object
Return an instance of this class to not send a reply to the caller.
- class triotp.gen_server.Reply(payload: Any)
Bases:
object
Return an instance of this class to send a reply to the caller.
- payload: Any
The response to send back
- class triotp.gen_server.Stop(reason: BaseException | None = None)
Bases:
object
Return an instance of this class to stop the generic server.
- reason: BaseException | None = None
- async triotp.gen_server.call(name_or_mid: str | MailboxID, payload: Any, timeout: float | None = None) Any
Send a request to the generic server and wait for a response.
This function creates a temporary bi-directional channel. The writer is passed to the handle_call function and is used to send the response back to the caller.
- Parameters:
name_or_mid – The generic server’s mailbox identifier
payload – The message to send to the generic server
timeout – Optional timeout after which this function fails
- Returns:
The response from the generic server
- Raises:
GenServerExited – If the generic server exited after handling the call
Exception – If the response is an exception
- async triotp.gen_server.cast(name_or_mid: str | MailboxID, payload: Any) None
Send a message to the generic server without expecting a response.
- Parameters:
name_or_mid – The generic server’s mailbox identifier
payload – The message to send
- async triotp.gen_server.reply(caller: MemorySendChannel, response: Any) None
The handle_call callback can start a background task to handle a slow request and return a NoReply instance. Use this function in the background task to send the response to the caller at a later time.
- Parameters:
caller – The caller SendChannel to use to send the response
response – The response to send back to the caller
from triotp import gen_server, supervisor, dynamic_supervisor import trio async def slow_task(message, caller): # do stuff with message await gen_server.reply(caller, response) async def handle_call(message, caller, state): await dynamic_supervisor.start_child( 'slow-task-pool', supervisor.child_spec( id='some-slow-task', task=slow_task, args=[message, caller], restart=supervisor.restart_strategy.TEMPORARY, ), ) return (gen_server.NoReply(), state)
- async triotp.gen_server.start(module: module, init_arg: Any | None = None, name: str | None = None) None
Starts the generic server loop.
- Parameters:
module – Module containing the generic server’s callbacks
init_arg – Optional argument passed to the init callback
name – Optional name to use to register the generic server’s mailbox
- Raises:
triotp.mailbox.NameAlreadyExist – If the name was already registered
Exception – If the generic server terminated with a non-null reason
triotp.dynamic_supervisor module
A dynamic supervisor is almost identical to a normal supervisor.
The only difference is that a dynamic supervisor creates a mailbox in order to receive requests to start new children from other tasks.
# app.py
from triotp import supervisor, dynamic_supervisor
import trio
from . import worker
async def start():
opts = supervisor.options()
children = [
supervisor.child_spec(
id='worker_pool',
task=dynamic_supervisor.start,
args=[opts, 'worker-pool'],
),
]
async with trio.open_nursery() as nursery:
await nursery.start_soon(supervisor.start, children, opts)
await dynamic_supervisor.start_child(
'worker-pool',
supervisor.child_spec(
id='worker-0',
task=worker.start,
args=[],
restart=supervisor.restart_strategy.TRANSIENT,
),
)
- async triotp.dynamic_supervisor.start(opts: options, name: str | None = None, task_status=TASK_STATUS_IGNORED) None
Starts a new dynamic supervisor.
This function creates a new mailbox to receive request for new children.
- Parameters:
opts – Supervisor options
name – Optional name to use to register the supervisor’s mailbox
task_status – Used to notify the trio nursery that the supervisor is ready
- Raises:
triotp.mailbox.NameAlreadyExist – If the name was already registered
from triotp import dynamic_supervisor, supervisor import trio async def example(): opts = supervisor.options() child_spec = # ... async with trio.open_nursery() as nursery: mid = await nursery.start(dynamic_supervisor.start, opts) await dynamic_supervisor.start_child(mid, child_spec)
- async triotp.dynamic_supervisor.start_child(name_or_mid: str | MailboxID, child_spec: child_spec) None
Start a new task in the specified supervisor.
- Parameters:
name_or_mid – Dynamic supervisor’s mailbox identifier
child_spec – Child specification to start
triotp.logging module
TriOTP logging system relies on the Logbook library. Each node has its own log handler.
- class triotp.logging.LogLevel(value)
Bases:
Enum
TriOTP node’s logging level
- CRITICAL = 6
- DEBUG = 2
- ERROR = 5
- INFO = 3
- NONE = 1
Logging is disabled
- WARNING = 4
- to_logbook() int
Convert this enum to a Logbook log level.
- Returns:
Logbook log level
- triotp.logging.getLogger(name: str) Logger
Get a logger by name.
- Parameters:
name – Name of the logger
- Returns:
Logbook Logger instance
triotp.helpers module
- triotp.helpers.current_module() module
This function should be called at the root of a module.
- Returns:
The current module (similar to __name__ for the current module name)
from triotp.helpers import current_module __module__ = current_module() # THIS WORKS def get_module(): return current_module() # THIS WON'T WORK