MessageQueueBackend Protocol
MessageQueueBackend is the core contract that every message-queue provider must implement. It is defined in abe/backends/message_queue/base/protocol.py and re-exports the MessageQueueBackendProtocol type from abe/types.py.
abe/backends/message_queue/base/protocol.py
from abe.types import MessageQueueBackendProtocol
MessageQueueBackend = MessageQueueBackendProtocol
Any implementation registered under the abe.backends.message_queue entry-point group must satisfy this protocol so that application code can treat all providers the same way.
Responsibilitiesโ
- Publish JSON-serialisable payloads to a routing key using
publish(). - Yield messages asynchronously via
consume(). - Provide a
from_env()factory that constructs a configured instance from process settings.
Method referenceโ
async publish(key: MessageQueueKey, payload: MessageQueuePayload) -> Noneโ
- Input:
keyis a string routing key;payloadis a JSON-compatible dictionary (MessageQueuePayload). - Behaviour: Send the payload to the backend, applying routing semantics appropriate to the provider (topic, stream, queue, etc.).
- Errors: Raise provider-specific exceptions for connectivity or serialisation failures. Surface meaningful messages; the loader does not wrap exceptions.
async consume(*, group: ConsumerGroup | None = None) -> AsyncIterator[MessageQueueMessage]โ
- Input: Optional
groupidentifier. Providers that have native consumer-group semantics should honour it; others may ignore it. - Output: Async iterator yielding
MessageQueueMessagestructures. Minimum requirement is the payload, but providers may attach metadata like IDs or timestamps. - Lifecycle: The iterator should run until cancelled. Handle
asyncio.CancelledErrorgracefully and release connections.
@classmethod from_env(cls) -> MessageQueueBackendProtocolโ
- Purpose: Create a ready-to-use backend instance based on environment configuration (URLs, credentials, queue names, etc.).
- Guidelines:
- Validate required variables and raise clear
RuntimeError/ValueErrorif they are missing. - Document accepted environment variables in your provider README and on this page if the provider lives in-repo.
- Validate required variables and raise clear
Example provider skeletonโ
from typing import AsyncIterator
from abe.types import (
MessageQueueBackendProtocol,
MessageQueueKey,
MessageQueuePayload,
MessageQueueMessage,
ConsumerGroup,
)
class RedisBackend(MessageQueueBackendProtocol):
def __init__(self, client) -> None:
self._client = client
@classmethod
def from_env(cls) -> "RedisBackend":
url = os.environ["REDIS_URL"]
client = redis.AsyncRedis.from_url(url)
return cls(client)
async def publish(self, key: MessageQueueKey, payload: MessageQueuePayload) -> None:
await self._client.xadd(key, {"data": json.dumps(payload)})
async def consume(self, *, group: ConsumerGroup = None) -> AsyncIterator[MessageQueueMessage]:
async for entry in self._client.xreadgroup(group or "default", ...):
yield json.loads(entry["data"])
Entry-point registrationโ
Expose your backend through pyproject.toml using the abe.backends.message_queue group:
[project.entry-points."abe.backends.message_queue"]
redis = "my_package.redis_backend:RedisBackend"
Once the package is installed, load_backend() can discover it without any extra wiring.
Testing tips for providersโ
- Use
MessageQueueBackendProtocolas a structural type in unit tests to ensure compliance. - Run asynchronous integration tests against the actual service (Redis, SQS, etc.) inside containers or test accounts.
- Provide fixtures that patch
load_backend()or register a test entry point so consumers can exercise code paths without external dependencies.