Skip to main content
Version: Next

Running Abstract Backend in Your Application

This guide shows how to wire the abstractions into a real program—configuring providers, publishing messages, and running consumers in a deployment-safe way.

Step 1 · Decide how your application selects a backend

Abstract Backend exposes multiple strategies for selecting providers:

  • Environment variable – Set QUEUE_BACKEND (recommended for production).
  • Programmatic override – Call load_backend("redis") with an explicit provider name.
  • Auto-discovery – Let the loader pick the first installed provider (falls back to the memory backend).

Use whichever matches your deployment tooling. For example, a container may inject QUEUE_BACKEND through Kubernetes ConfigMaps.

Step 2 · Publish messages through the abstraction

publisher.py
import asyncio
from abe.backends.message_queue.loader import load_backend


async def main() -> None:
backend = load_backend()
await backend.publish("orders.created", {"order_id": "A-123", "total": 42.0})


if __name__ == "__main__":
asyncio.run(main())
  • load_backend() returns the provider resolved at runtime.
  • publish() accepts a routing key plus a JSON-serialisable payload (MessageQueuePayload).

Step 3 · Consume messages with the shared event loop helper

consumer.py
import asyncio
from abe.backends.message_queue.consumer import AsyncLoopConsumer
from abe.backends.message_queue.loader import load_backend


async def handle(payload: dict[str, object]) -> None:
print("Processing", payload)


async def main() -> None:
backend = load_backend()
consumer = AsyncLoopConsumer(backend, group="billing")
await consumer.run(handle)


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
  • AsyncLoopConsumer lives in abe/backends/message_queue/consumer.py and works with any provider that implements MessageQueueBackend.consume().
  • Supply a group string if the provider supports consumer groups (optional—ignored by providers that do not implement the concept).

Step 4 · Manage lifecycle and graceful shutdown

The consumer exposes shutdown() for orderly teardown. When embedding in a long-running service (FastAPI, Django, etc.), call it during shutdown hooks:

from contextlib import AsyncExitStack

stack = AsyncExitStack()
backend = load_backend()
consumer = AsyncLoopConsumer(backend)

async def start() -> None:
await stack.enter_async_context(consumer) # if you wrap `shutdown` in acontextlib (see docs)

async def stop() -> None:
await consumer.shutdown()
await stack.aclose()

The provided implementation traps asyncio.CancelledError and surfaces unexpected errors via logging, so it integrates cleanly with async task groups.

Step 5 · Configure providers via environment

Each provider can read its configuration inside from_env(). Example using the in-repo memory backend and a hypothetical Redis backend:

# Development: use shared memory backend (default)
pip install abe-backend-memory

# Production: switch to redis provider
pip install abe-backend-redis
export QUEUE_BACKEND=redis
export REDIS_URL=redis://user:pass@host:6379/0

As long as the provider is installed and the required variables are present, load_backend() will return the correct implementation without code changes.

Step 6 · Testing strategy

  • Unit tests – Mock load_backend() or pass a fake QueueBackend that records interactions.
  • Integration tests – Install the target provider and run the tests against docker-compose services or cloud sandboxes.
  • Fallback tests – Keep the memory backend installed and verify minimal environments still boot.

CI pipelines can treat providers as optional extras by installing them conditionally or using matrix jobs.

Going further

  • Add retry/dead-letter logic by subclassing AsyncLoopConsumer and overriding the handler wrapper.
  • Wrap publish() calls with your own domain facade to ensure consistent routing keys.
  • Explore the API reference for a catalogue of protocols, utilities, and type aliases.