Skip to main content
Version: Next

MessageQueueBackend Protocol

The MessageQueueBackend protocol defines the interface that every message-queue integration must implement to work with the Abstract Backend runtime.

Protocol Definition

from typing import Any, AsyncIterator, Dict, Optional, Protocol


class MessageQueueBackend(Protocol):
"""Standard interface for Abstract Backend message queue providers."""

@classmethod
def from_env(cls) -> "MessageQueueBackend":
"""Build a configured backend from environment variables.

Returns:
MessageQueueBackend: Configured backend instance.

Raises:
ValueError: Missing mandatory configuration.
ConnectionError: Unable to reach the queue service.
"""

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue.

Args:
key: Routing key/topic/channel for the message.
payload: JSON-serialisable message payload.

Raises:
ConnectionError: Queue service is unavailable.
SerializationError: Payload cannot be serialised.
PublishError: Delivery failed after retries.
"""

async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue.

Args:
group: Optional consumer-group identifier for load balancing.

Yields:
Dict[str, Any]: Deserialised message payloads.

Raises:
ConnectionError: Queue service is unavailable.
ConsumeError: Delivery failed after retries.
"""

Implementation Requirements

Environment Configuration

Support configuration through environment variables inside from_env():

@classmethod
def from_env(cls) -> "YourMessageQueueBackend":
import os

host = os.getenv("MESSAGE_QUEUE_HOST", "localhost")
port = int(os.getenv("MESSAGE_QUEUE_PORT", "5672"))

username = os.getenv("MESSAGE_QUEUE_USERNAME")
password = os.getenv("MESSAGE_QUEUE_PASSWORD")
if not username or not password:
raise ValueError("MESSAGE_QUEUE_USERNAME and MESSAGE_QUEUE_PASSWORD are required")

tls_enabled = os.getenv("MESSAGE_QUEUE_TLS", "false").lower() == "true"
virtual_host = os.getenv("MESSAGE_QUEUE_VHOST", "/")

return cls(
host=host,
port=port,
username=username,
password=password,
virtual_host=virtual_host,
tls_enabled=tls_enabled,
)

Message Publishing

Ensure publish() handles connection management, serialisation, routing, and retries:

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
await self._ensure_connected()

body = json.dumps(payload)

await self._with_retry(
lambda: self._client.publish(
routing_key=key,
body=body.encode("utf-8"),
headers={"content-type": "application/json"},
)
)

logger.debug("Published message", extra={"routing_key": key})

Message Consumption

Provide an async iterator that recovers from transient failures:

async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
await self._ensure_connected()

queue_name = await self._prepare_queue(group)

async with self._client.consume(queue_name) as consumer:
async for message in consumer:
try:
payload = json.loads(message.body)
except json.JSONDecodeError as exc:
logger.error("Invalid JSON payload", exc_info=exc)
await message.nack(requeue=False)
continue

yield payload
await message.ack()

Protocol Compliance Checklist

✅ Configuration

  • Implements from_env()
  • Reads configuration from environment variables
  • Validates required fields with clear errors
  • Supplies sensible defaults for optional values

✅ Publishing

  • Implements publish()
  • Manages connections and retries
  • Supports routing by key/topic
  • Serialises payloads to JSON
  • Logs failures and successes appropriately

✅ Consuming

  • Implements consume() returning an async iterator
  • Supports optional consumer groups
  • Handles cancellation and reconnection gracefully
  • Deserialises JSON payloads
  • Acknowledges or re-queues messages appropriately

✅ Error Handling

  • Raises meaningful exceptions with context
  • Logs issues with actionable details
  • Implements retry/backoff strategies for transient failures

✅ Testing

  • Unit tests cover public methods
  • Integration tests exercise real queue behaviour
  • Error-path tests validate resilience
  • Load/performance tests confirm scaling characteristics

Type Hints and Annotations

Add full typing to support static analysis:

from typing import Any, AsyncIterator, Dict, Optional
import logging


logger = logging.getLogger(__name__)


class YourMessageQueueBackend(MessageQueueBackend):
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
*,
virtual_host: str = "/",
tls_enabled: bool = False,
) -> None:
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.tls_enabled = tls_enabled
self._client: Optional[Any] = None

@classmethod
def from_env(cls) -> "YourMessageQueueBackend":
# Implementation shown earlier
...

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
await self._ensure_connected()
await self._client.publish(key, json.dumps(payload))

async def consume(
self,
*,
group: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
await self._ensure_connected()
async for message in self._client.consume(group=group):
yield json.loads(message)

Common Implementation Patterns

Connection Management

async def _ensure_connected(self) -> None:
if self._client is None or self._client.closed:
self._client = await self._connect()


async def _with_retry(self, operation, *, retries: int = 3) -> None:
for attempt in range(retries + 1):
try:
return await operation()
except ConnectionError as exc:
if attempt == retries:
raise
wait_time = 2 ** attempt
logger.warning("Retrying operation", extra={"attempt": attempt, "wait": wait_time})
await asyncio.sleep(wait_time)

Payload Serialisation

def _serialise(self, payload: Dict[str, Any]) -> bytes:
return json.dumps(payload, default=str).encode("utf-8")


def _deserialise(self, data: bytes) -> Dict[str, Any]:
return json.loads(data.decode("utf-8"))

Consumer Group Handling

async def _prepare_queue(self, group: Optional[str]) -> str:
if group:
queue_name = f"abstract_backend_{group}"
await self._client.declare_queue(queue_name, durable=True)
await self._client.bind(queue_name, routing_key="#")
return queue_name

await self._client.declare_queue("abstract_backend_default", durable=True)
return "abstract_backend_default"