Source code for async_patterns.coro_queue

import asyncio
import concurrent
import logging

__all__ = ['CoroQueue', 'CoroQueueClass']

logger = logging.getLogger(__name__)

[docs]class CoroQueue: """ A queue of coroutines to be called sequentially. :param loop: event loop """ _cancelled = False __task_run_forever = None def __init__(self, loop): self.loop = loop self.__queue = asyncio.Queue() self.__waiter = None self._cancelled = False
[docs] def schedule_run_forever(self): """ Schedule asyncio to run the consumer loop. """ assert not self._cancelled assert not self.__task_run_forever self.__task_run_forever = self.loop.create_task(self.__run_forever())
async def __run_one(self): """ this method is a ``coroutine``. """ logger.debug(f'{id(self)} wait for next coro') args_, future = await self.__queue.get() if future.cancelled(): logger.error('future is cancelled') raise concurrent.futures.CancelledError() #raise Exception('future cancelled coro={} loop_running={}'.format( # coro, loop.is_running())) f, args, kwargs = args_ coro = f(*args, **kwargs) assert asyncio.iscoroutine(coro) try: logger.debug(f'{id(self)} await {f} args={args} kwargs={kwargs}') res = await coro except concurrent.futures.CancelledError as e: logger.error(f'error while running coro: {e!r}') future.set_exception(e) raise except Exception as e: logger.error(f'error while running coro: {e!r}') future.set_exception(e) future.exception() else: if future.cancelled(): logger.error('future is cancelled') if self._cancelled: raise concurrent.futures.CancelledError() else: #raise Exception('future cancelled coro={} loop_running={} cancelled={}'.format( # coro, self.loop.is_running(), self._cancelled)) # TODO in pdb self._cancelled evaluates to True here raise concurrent.futures.CancelledError() logger.debug('set future result') future.set_result(res) async def __run_forever(self): """ The consumer loop. Loop forever getting the next coroutine in the queue and awaiting it. This method is a ``coroutine``. """ while True: if self._cancelled: raise concurrent.futures.CancelledError() if self.__waiter: if self.__queue.empty(): self.__waiter.set_result(None) await self.__run_one()
[docs] def put_nowait(self, f, *args, **kwargs): """ Put a coroutine onto the queue. :param f: a coroutine function :param args: arguments to be passed to the coroutine """ assert not self._cancelled assert self.__task_run_forever future = self.loop.create_future() args_ = (f, args, kwargs) self.__queue.put_nowait((args_, future)) logger.debug(f'{id(self)} put {f!r}. queue length {self.__queue.qsize()}') return future
[docs] async def close(self): """ Cancel all pending coroutines. This method is a ``coroutine``. """ assert not self._cancelled self._cancelled = True self.__task_run_forever.cancel() try: await self.__task_run_forever except concurrent.futures.CancelledError as e: pass while not self.__queue.empty(): item = self.__queue.get_nowait() _, future = item #task = self.loop.create_task(coro) #ret = task.cancel() #try: # res = await task #except concurrent.futures.CancelledError as e: # pass future.cancel()
[docs] async def join(self): """ Wait for all coroutines to finish. Await the underlying :py:class:`asyncio.Queue` object's join method. This method is a ``coroutine``. """ assert not self._cancelled if self.__queue.empty(): return assert self.__waiter is None self.__waiter = self.loop.create_future() await self.__waiter self.__waiter = None
[docs]class CoroQueueClass: """ Provide a method wrapper that schedules execution of the wrapped function using a :py:class:`CoroQueue` object. .. testsetup:: * import asyncio loop = asyncio.get_event_loop() from async_patterns.coro_queue import CoroQueueClass .. testcode:: class Foo(CoroQueueClass): @CoroQueueClass.wrap async def a(self): await asyncio.sleep(1) async def test(loop): f = Foo() f._loop = loop await f.a() await f.close() loop.run_until_complete(test(loop)) """ _coro_queue = None _loop = None def __init__(self, queue=None, loop=None): self._coro_queue = queue self._loop = loop @classmethod def wrap(cls, f): async def wrapped(self, *args, **kwargs): future = self.coro_queue.put_nowait(f, self, *args, **kwargs) return (await future) return wrapped @property def coro_queue(self): if self._coro_queue is None: self._coro_queue = CoroQueue(self._loop) self._coro_queue.schedule_run_forever() return self._coro_queue def schedule_run_forever(self): self.coro_queue.schedule_run_forever() async def close(self): await self.coro_queue.close()