MessageQueueBackend Protocol
The MessageQueueBackend protocol defines the interface that every message-queue integration must implement to work with the Abstract Backend runtime.
Protocol Definition
from typing import Any, AsyncIterator, Dict, Optional, Protocol
class MessageQueueBackend(Protocol):
"""Standard interface for Abstract Backend message queue providers."""
@classmethod
def from_env(cls) -> "MessageQueueBackend":
"""Build a configured backend from environment variables.
Returns:
MessageQueueBackend: Configured backend instance.
Raises:
ValueError: Missing mandatory configuration.
ConnectionError: Unable to reach the queue service.
"""
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue.
Args:
key: Routing key/topic/channel for the message.
payload: JSON-serialisable message payload.
Raises:
ConnectionError: Queue service is unavailable.
SerializationError: Payload cannot be serialised.
PublishError: Delivery failed after retries.
"""
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue.
Args:
group: Optional consumer-group identifier for load balancing.
Yields:
Dict[str, Any]: Deserialised message payloads.
Raises:
ConnectionError: Queue service is unavailable.
ConsumeError: Delivery failed after retries.
"""
Implementation Requirements
Environment Configuration
Support configuration through environment variables inside from_env():
@classmethod
def from_env(cls) -> "YourMessageQueueBackend":
import os
host = os.getenv("MESSAGE_QUEUE_HOST", "localhost")
port = int(os.getenv("MESSAGE_QUEUE_PORT", "5672"))
username = os.getenv("MESSAGE_QUEUE_USERNAME")
password = os.getenv("MESSAGE_QUEUE_PASSWORD")
if not username or not password:
raise ValueError("MESSAGE_QUEUE_USERNAME and MESSAGE_QUEUE_PASSWORD are required")
tls_enabled = os.getenv("MESSAGE_QUEUE_TLS", "false").lower() == "true"
virtual_host = os.getenv("MESSAGE_QUEUE_VHOST", "/")
return cls(
host=host,
port=port,
username=username,
password=password,
virtual_host=virtual_host,
tls_enabled=tls_enabled,
)
Message Publishing
Ensure publish() handles connection management, serialisation, routing, and retries:
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
await self._ensure_connected()
body = json.dumps(payload)
await self._with_retry(
lambda: self._client.publish(
routing_key=key,
body=body.encode("utf-8"),
headers={"content-type": "application/json"},
)
)
logger.debug("Published message", extra={"routing_key": key})
Message Consumption
Provide an async iterator that recovers from transient failures:
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
await self._ensure_connected()
queue_name = await self._prepare_queue(group)
async with self._client.consume(queue_name) as consumer:
async for message in consumer:
try:
payload = json.loads(message.body)
except json.JSONDecodeError as exc:
logger.error("Invalid JSON payload", exc_info=exc)
await message.nack(requeue=False)
continue
yield payload
await message.ack()
Protocol Compliance Checklist
✅ Configuration
- Implements
from_env() - Reads configuration from environment variables
- Validates required fields with clear errors
- Supplies sensible defaults for optional values
✅ Publishing
- Implements
publish() - Manages connections and retries
- Supports routing by key/topic
- Serialises payloads to JSON
- Logs failures and successes appropriately
✅ Consuming
- Implements
consume()returning an async iterator - Supports optional consumer groups
- Handles cancellation and reconnection gracefully
- Deserialises JSON payloads
- Acknowledges or re-queues messages appropriately
✅ Error Handling
- Raises meaningful exceptions with context
- Logs issues with actionable details
- Implements retry/backoff strategies for transient failures
✅ Testing
- Unit tests cover public methods
- Integration tests exercise real queue behaviour
- Error-path tests validate resilience
- Load/performance tests confirm scaling characteristics
Type Hints and Annotations
Add full typing to support static analysis:
from typing import Any, AsyncIterator, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class YourMessageQueueBackend(MessageQueueBackend):
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
*,
virtual_host: str = "/",
tls_enabled: bool = False,
) -> None:
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.tls_enabled = tls_enabled
self._client: Optional[Any] = None
@classmethod
def from_env(cls) -> "YourMessageQueueBackend":
# Implementation shown earlier
...
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
await self._ensure_connected()
await self._client.publish(key, json.dumps(payload))
async def consume(
self,
*,
group: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
await self._ensure_connected()
async for message in self._client.consume(group=group):
yield json.loads(message)
Common Implementation Patterns
Connection Management
async def _ensure_connected(self) -> None:
if self._client is None or self._client.closed:
self._client = await self._connect()
async def _with_retry(self, operation, *, retries: int = 3) -> None:
for attempt in range(retries + 1):
try:
return await operation()
except ConnectionError as exc:
if attempt == retries:
raise
wait_time = 2 ** attempt
logger.warning("Retrying operation", extra={"attempt": attempt, "wait": wait_time})
await asyncio.sleep(wait_time)
Payload Serialisation
def _serialise(self, payload: Dict[str, Any]) -> bytes:
return json.dumps(payload, default=str).encode("utf-8")
def _deserialise(self, data: bytes) -> Dict[str, Any]:
return json.loads(data.decode("utf-8"))
Consumer Group Handling
async def _prepare_queue(self, group: Optional[str]) -> str:
if group:
queue_name = f"abstract_backend_{group}"
await self._client.declare_queue(queue_name, durable=True)
await self._client.bind(queue_name, routing_key="#")
return queue_name
await self._client.declare_queue("abstract_backend_default", durable=True)
return "abstract_backend_default"