Source code for async_patterns.callbacks

import asyncio

__all__ = ['Callbacks']

[docs]class Callbacks: """ Accepts callables objects and stores them in a list, then calls them all when something interesting happens """ def __init__(self): self.__callbacks = []
[docs] def add_callback(self, callable_): """ Add **callable_** to the list of callbacks. When this class's :py:func:`Callbacks.__call__` function is called, **callable_** will be called with the same arguments passed to that function. :param callable_: a callable object """ self.__callbacks.append(callable_)
def __call__(self, *args): """ Trigger a call to each callback, pass **args** to each. """ for callable_ in self.__callbacks: callable_(*args)
[docs] async def acall(self, *args): """ This is a coroutine. Trigger a call to each callback and assume each is a coroutine, pass **args** to each. """ for callable_ in self.__callbacks: await callable_(*args)