Source code for async_patterns.protocol
import asyncio
import concurrent.futures
import logging
import pickle
import io
logger = logging.getLogger(__name__)
[docs]class Protocol(asyncio.Protocol):
"""
Derives ``asyncio.Protocol``.
Interprets incoming and outgoing data as object instances satisfying the packet protocol (see below).
Schedules execution of the packet call function for incoming packets.
Responses
---------
A unique id is added to outgoing packets.
A future is also created for each outgoing packet.
The receiving end can construct a packet and set the ``response_to`` attribute to this id and send the new packet back to this protocol.
This protocol will see the ``response_to`` attribute and assign the response packet as the result of the future.
packet protocol
::
class Packet:
async def __call__(self, protocol):
# protocol is this instance
pass
"""
def __init__(self, loop):
logger.debug('Protocol created')
self.loop = loop
self.__message_id_last = 0
self.__message_futures = {}
self.queue_packet_acall = asyncio.Queue()
def __next_message_id(self):
self.__message_id_last += 1
return self.__message_id_last
[docs] def connection_made(self, transport):
logger.debug('Connection made')
logger.debug('transport = {}'.format(transport))
peername = transport.get_extra_info('peername')
logger.debug('Connection from {}'.format(peername))
self.transport = transport
[docs] def connection_lost(self, exc):
logger.debug('The server closed the connection {}'.format(exc))
[docs] def data_received(self, data):
logger.debug('Received {} bytes'.format(len(data)))
l = len(data)
stream = io.BytesIO(data)
while stream.tell() < l:
logger.debug('{} bytes in stream. at position {}.'.format(len(stream.getbuffer()), stream.tell()))
packet = pickle.load(stream)
self.__packet_received(packet)
async def async_packet_received(self, packet):
try:
res = await packet(self)
except concurrent.futures.CancelledError as e:
logger.warning('packet call cancelled. packet={} {}'.format(repr(packet), e))
raise
except Exception as e:
logger.exception('error during packet call. {}'.format(e))
else:
logger.info('packet call returned {}'.format(repr(res)))
def __packet_received(self, packet):
logger.debug('packet received: {}'.format(repr(packet)))
task = self.loop.create_task(self.async_packet_received(packet))
# keep track of packet acall coroutines
packet._task_acall = task
self.queue_packet_acall.put_nowait(task)
if packet.response_to is not None:
future = self.__message_futures[packet.response_to]
future.set_result(packet)
return task
[docs] def write(self, packet):
"""
Write an object to the socket.
Assign the object a ``message_id`` before sending.
:param packet: object to pickle and send
:rtype: future
"""
packet.message_id = self.__next_message_id()
b = pickle.dumps(packet)
logger.debug(f'transport={self.transport!r}')
logger.debug('write {} {} bytes'.format(packet.__class__.__name__, len(b)))
self.transport.write(b)
fut = self.loop.create_future()
self.__message_futures[packet.message_id] = fut
return fut
[docs] async def close(self):
"""
This method is a :ref:`coroutine <coroutine>`.
"""
logger.info('close transport')
self.transport.close()
logger.debug('cancel packet acall')
while not self.queue_packet_acall.empty():
task = self.queue_packet_acall.get_nowait()
logger.debug(' task = {}'.format(repr(task)))
task.cancel()
try:
await task
except concurrent.futures.CancelledError:
pass