EventConsumer & AsyncLoopConsumer
Abstract Backend separates the consumer contract from the default runtime implementation so you can plug in alternative event loops when needed.
EventConsumerlives inabe/backends/message_queue/base/consumer.pyand is defined as a PEP 544 protocol.AsyncLoopConsumerinabe/backends/message_queue/consumer.pyis the default implementation that wraps aMessageQueueBackendand repeatedly invokes your handler coroutine.
EventConsumer protocolโ
abe/backends/message_queue/base/consumer.py
class EventConsumer(Protocol):
async def run(self, handler: Callable[[Dict[str, Any]], Awaitable[None]]) -> None: ...
async def shutdown(self) -> None: ...
run(handler)โ
- Accepts an async handler that receives the message payload returned by the backend.
- Implementations are expected to respect cancellation, propagate
asyncio.CancelledError, and log other exceptions.
shutdown()โ
- Stops the consumer gracefully, ensuring in-flight messages complete where possible.
- Should be idempotent: repeated calls after the consumer stops must be safe.
AsyncLoopConsumerโ
abe/backends/message_queue/consumer.py
class AsyncLoopConsumer(EventConsumer):
def __init__(self, backend: MessageQueueBackend, group: Optional[str] = None): ...
async def run(self, handler: Callable[[Dict[str, Any]], Awaitable[None]]) -> None: ...
async def shutdown(self) -> None: ...
Constructionโ
backend: TheMessageQueueBackendinstance returned byload_backend()or injected directly.group(optional): Consumer group identifier forwarded tobackend.consume()when supported.
Runtime behaviourโ
- Creates an asyncio task that iterates over
backend.consume(group=group). - Awaits the provided handler for every payload. Exceptions are logged and do not terminate the loop unless configured otherwise.
- Tracks internal
_runningstate and stores the current task for cancellation during shutdown.
Shutdown flowโ
- Cancels the running task if it is still active.
- Handles
asyncio.CancelledErrorandasyncio.TimeoutErrorexplicitly, logging other exceptions at warning level. - Resets internal state (
_task = None,_running = False).
Extending the consumerโ
- Subclass
AsyncLoopConsumerto add retry/backoff logic or metrics. - Implement the
EventConsumerprotocol from scratch to integrate with orchestrators such as Celery or to bridge non-async backends.
Testing helpersโ
- Pass a fake backend that yields predetermined payloads from
consume()to exercise handler logic. - Use
pytest'sasynciosupport to run the consumer inside event loops and assert thatshutdown()cancels tasks cleanly.