Skip to main content
Version: Next

EventConsumer & AsyncLoopConsumer

Abstract Backend separates the consumer contract from the default runtime implementation so you can plug in alternative event loops when needed.

  • EventConsumer lives in abe/backends/message_queue/base/consumer.py and is defined as a PEP 544 protocol.
  • AsyncLoopConsumer in abe/backends/message_queue/consumer.py is the default implementation that wraps a MessageQueueBackend and repeatedly invokes your handler coroutine.

EventConsumer protocolโ€‹

abe/backends/message_queue/base/consumer.py
class EventConsumer(Protocol):
async def run(self, handler: Callable[[Dict[str, Any]], Awaitable[None]]) -> None: ...
async def shutdown(self) -> None: ...

run(handler)โ€‹

  • Accepts an async handler that receives the message payload returned by the backend.
  • Implementations are expected to respect cancellation, propagate asyncio.CancelledError, and log other exceptions.

shutdown()โ€‹

  • Stops the consumer gracefully, ensuring in-flight messages complete where possible.
  • Should be idempotent: repeated calls after the consumer stops must be safe.

AsyncLoopConsumerโ€‹

abe/backends/message_queue/consumer.py
class AsyncLoopConsumer(EventConsumer):
def __init__(self, backend: MessageQueueBackend, group: Optional[str] = None): ...
async def run(self, handler: Callable[[Dict[str, Any]], Awaitable[None]]) -> None: ...
async def shutdown(self) -> None: ...

Constructionโ€‹

  • backend: The MessageQueueBackend instance returned by load_backend() or injected directly.
  • group (optional): Consumer group identifier forwarded to backend.consume() when supported.

Runtime behaviourโ€‹

  1. Creates an asyncio task that iterates over backend.consume(group=group).
  2. Awaits the provided handler for every payload. Exceptions are logged and do not terminate the loop unless configured otherwise.
  3. Tracks internal _running state and stores the current task for cancellation during shutdown.

Shutdown flowโ€‹

  • Cancels the running task if it is still active.
  • Handles asyncio.CancelledError and asyncio.TimeoutError explicitly, logging other exceptions at warning level.
  • Resets internal state (_task = None, _running = False).

Extending the consumerโ€‹

  • Subclass AsyncLoopConsumer to add retry/backoff logic or metrics.
  • Implement the EventConsumer protocol from scratch to integrate with orchestrators such as Celery or to bridge non-async backends.

Testing helpersโ€‹

  • Pass a fake backend that yields predetermined payloads from consume() to exercise handler logic.
  • Use pytest's asyncio support to run the consumer inside event loops and assert that shutdown() cancels tasks cleanly.