Running Abstract Backend in Your Application
This guide shows how to wire the abstractions into a real program—configuring providers, publishing messages, and running consumers in a deployment-safe way.
Step 1 · Decide how your application selects a backend
Abstract Backend exposes multiple strategies for selecting providers:
- Environment variable – Set
QUEUE_BACKEND(recommended for production). - Programmatic override – Call
load_backend("redis")with an explicit provider name. - Auto-discovery – Let the loader pick the first installed provider (falls back to the memory backend).
Use whichever matches your deployment tooling. For example, a container may inject QUEUE_BACKEND through Kubernetes ConfigMaps.
Step 2 · Publish messages through the abstraction
import asyncio
from abe.backends.message_queue.loader import load_backend
async def main() -> None:
backend = load_backend()
await backend.publish("orders.created", {"order_id": "A-123", "total": 42.0})
if __name__ == "__main__":
asyncio.run(main())
load_backend()returns the provider resolved at runtime.publish()accepts a routing key plus a JSON-serialisable payload (MessageQueuePayload).
Step 3 · Consume messages with the shared event loop helper
import asyncio
from abe.backends.message_queue.consumer import AsyncLoopConsumer
from abe.backends.message_queue.loader import load_backend
async def handle(payload: dict[str, object]) -> None:
print("Processing", payload)
async def main() -> None:
backend = load_backend()
consumer = AsyncLoopConsumer(backend, group="billing")
await consumer.run(handle)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
AsyncLoopConsumerlives inabe/backends/message_queue/consumer.pyand works with any provider that implementsMessageQueueBackend.consume().- Supply a
groupstring if the provider supports consumer groups (optional—ignored by providers that do not implement the concept).
Step 4 · Manage lifecycle and graceful shutdown
The consumer exposes shutdown() for orderly teardown. When embedding in a long-running service (FastAPI, Django, etc.), call it during shutdown hooks:
from contextlib import AsyncExitStack
stack = AsyncExitStack()
backend = load_backend()
consumer = AsyncLoopConsumer(backend)
async def start() -> None:
await stack.enter_async_context(consumer) # if you wrap `shutdown` in acontextlib (see docs)
async def stop() -> None:
await consumer.shutdown()
await stack.aclose()
The provided implementation traps asyncio.CancelledError and surfaces unexpected errors via logging, so it integrates cleanly with async task groups.
Step 5 · Configure providers via environment
Each provider can read its configuration inside from_env(). Example using the in-repo memory backend and a hypothetical Redis backend:
# Development: use shared memory backend (default)
pip install abe-backend-memory
# Production: switch to redis provider
pip install abe-backend-redis
export QUEUE_BACKEND=redis
export REDIS_URL=redis://user:pass@host:6379/0
As long as the provider is installed and the required variables are present, load_backend() will return the correct implementation without code changes.
Step 6 · Testing strategy
- Unit tests – Mock
load_backend()or pass a fakeQueueBackendthat records interactions. - Integration tests – Install the target provider and run the tests against docker-compose services or cloud sandboxes.
- Fallback tests – Keep the memory backend installed and verify minimal environments still boot.
CI pipelines can treat providers as optional extras by installing them conditionally or using matrix jobs.
Going further
- Add retry/dead-letter logic by subclassing
AsyncLoopConsumerand overriding thehandlerwrapper. - Wrap
publish()calls with your own domain facade to ensure consistent routing keys. - Explore the API reference for a catalogue of protocols, utilities, and type aliases.