Skip to main content
Version: Next

MessageQueueBackend Protocol

MessageQueueBackend is the core contract that every message-queue provider must implement. It is defined in abe/backends/message_queue/base/protocol.py and re-exports the MessageQueueBackendProtocol type from abe/types.py.

abe/backends/message_queue/base/protocol.py
from abe.types import MessageQueueBackendProtocol

MessageQueueBackend = MessageQueueBackendProtocol

Any implementation registered under the abe.backends.message_queue entry-point group must satisfy this protocol so that application code can treat all providers the same way.

Responsibilitiesโ€‹

  • Publish JSON-serialisable payloads to a routing key using publish().
  • Yield messages asynchronously via consume().
  • Provide a from_env() factory that constructs a configured instance from process settings.

Method referenceโ€‹

async publish(key: MessageQueueKey, payload: MessageQueuePayload) -> Noneโ€‹

  • Input: key is a string routing key; payload is a JSON-compatible dictionary (MessageQueuePayload).
  • Behaviour: Send the payload to the backend, applying routing semantics appropriate to the provider (topic, stream, queue, etc.).
  • Errors: Raise provider-specific exceptions for connectivity or serialisation failures. Surface meaningful messages; the loader does not wrap exceptions.

async consume(*, group: ConsumerGroup | None = None) -> AsyncIterator[MessageQueueMessage]โ€‹

  • Input: Optional group identifier. Providers that have native consumer-group semantics should honour it; others may ignore it.
  • Output: Async iterator yielding MessageQueueMessage structures. Minimum requirement is the payload, but providers may attach metadata like IDs or timestamps.
  • Lifecycle: The iterator should run until cancelled. Handle asyncio.CancelledError gracefully and release connections.

@classmethod from_env(cls) -> MessageQueueBackendProtocolโ€‹

  • Purpose: Create a ready-to-use backend instance based on environment configuration (URLs, credentials, queue names, etc.).
  • Guidelines:
    • Validate required variables and raise clear RuntimeError/ValueError if they are missing.
    • Document accepted environment variables in your provider README and on this page if the provider lives in-repo.

Example provider skeletonโ€‹

from typing import AsyncIterator

from abe.types import (
MessageQueueBackendProtocol,
MessageQueueKey,
MessageQueuePayload,
MessageQueueMessage,
ConsumerGroup,
)


class RedisBackend(MessageQueueBackendProtocol):
def __init__(self, client) -> None:
self._client = client

@classmethod
def from_env(cls) -> "RedisBackend":
url = os.environ["REDIS_URL"]
client = redis.AsyncRedis.from_url(url)
return cls(client)

async def publish(self, key: MessageQueueKey, payload: MessageQueuePayload) -> None:
await self._client.xadd(key, {"data": json.dumps(payload)})

async def consume(self, *, group: ConsumerGroup = None) -> AsyncIterator[MessageQueueMessage]:
async for entry in self._client.xreadgroup(group or "default", ...):
yield json.loads(entry["data"])

Entry-point registrationโ€‹

Expose your backend through pyproject.toml using the abe.backends.message_queue group:

[project.entry-points."abe.backends.message_queue"]
redis = "my_package.redis_backend:RedisBackend"

Once the package is installed, load_backend() can discover it without any extra wiring.

Testing tips for providersโ€‹

  • Use MessageQueueBackendProtocol as a structural type in unit tests to ensure compliance.
  • Run asynchronous integration tests against the actual service (Redis, SQS, etc.) inside containers or test accounts.
  • Provide fixtures that patch load_backend() or register a test entry point so consumers can exercise code paths without external dependencies.