Getting Started with Abstract Backend Message Queue Service Development
This guide walks you through creating a message queue service implementation for the Abstract Backend runtime. In just a few steps, you'll deliver a backend that can be distributed and reused across Abstract Backend deployments.
What You'll Build
By following this guide, you'll create:
✅ Custom Message Queue Backend: An implementation that follows the MessageQueueBackend protocol
✅ Distributable Package: A Python package installable via pip install
✅ Runtime Integration: Seamless integration with Abstract Backend deployments
✅ Environment Configuration: Flexible configuration via environment variables
✅ Production Ready: Tests, documentation, and CI/CD workflows included
Understanding the Plugin System
Abstract Backend exposes a plugin-based architecture for message queue providers. Here's how it works:
Plugin Discovery Mechanism
Abstract Backend discovers providers using Python's entry points system:
# In your pyproject.toml
[project.entry-points."abe.backends.message_queue.service"]
redis = "abstract_backend_redis.service:RedisMessageQueueBackend"
rabbitmq = "abstract_backend_rabbitmq.service:RabbitMQMessageQueueBackend"
memory = "abstract_backend_memory.service:MemoryMessageQueueBackend"
your_plugin = "your_package.module:YourMessageQueueBackend"
Plugin Lifecycle
- Discovery: Abstract Backend scans for providers registered under
abe.backends.message_queue.service - Selection: Based on the
MESSAGE_QUEUE_BACKENDenvironment variable - Instantiation: Calls
YourMessageQueueBackend.from_env()to build the instance - Operation: Uses
publish()andconsume()for message handling
Message Queue Backend Protocol
Your provider must implement the MessageQueueBackend protocol:
from abc import ABC, abstractmethod
from typing import Any, AsyncIterator, Dict, Optional
class MessageQueueBackend(ABC):
"""Protocol for message queue backend implementations."""
@classmethod
@abstractmethod
def from_env(cls) -> "MessageQueueBackend":
"""Create instance from environment variables.
This method should read configuration from environment variables
and return a configured instance of your queue backend.
Returns:
Configured queue backend instance
Raises:
ValueError: If required environment variables are missing
ConnectionError: If unable to connect to the queue service
"""
pass
@abstractmethod
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue.
Args:
key: Queue key/topic/channel to publish to
payload: Message data to publish
Raises:
PublishError: If message cannot be published
ConnectionError: If connection to queue is lost
"""
pass
@abstractmethod
async def consume(self, key: str) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue.
Args:
key: Queue key/topic/channel to consume from
Yields:
Message data from the queue
Raises:
ConsumeError: If unable to consume messages
ConnectionError: If connection to queue is lost
"""
pass
Implementation Patterns
1. Basic Message Queue Backend Structure
Here's a template for implementing your message queue backend:
"""
Your Message Queue Backend Implementation
This module provides a YOUR-MESSAGE-QUEUE implementation of the MessageQueueBackend protocol
for Abstract Backend.
"""
import asyncio
import json
import logging
import os
from typing import Any, AsyncIterator, Dict, Optional
from abe.backends.message_queue.base.protocol import MessageQueueBackend
logger = logging.getLogger(__name__)
class YourMessageQueueBackend(MessageQueueBackend):
"""YOUR-MESSAGE-QUEUE implementation of MessageQueueBackend."""
def __init__(
self,
connection_url: str,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs
):
"""Initialize the YOUR-QUEUE backend.
Args:
connection_url: Connection string for YOUR-QUEUE service
username: Authentication username (optional)
password: Authentication password (optional)
**kwargs: Additional configuration options
"""
self.connection_url = connection_url
self.username = username
self.password = password
self.config = kwargs
self._client = None
self._connected = False
@classmethod
def from_env(cls) -> "YourMessageQueueBackend":
"""Create instance from environment variables.
Expected environment variables:
- YOUR_MESSAGE_QUEUE_URL: Connection URL for YOUR-MESSAGE-QUEUE service
- YOUR_MESSAGE_QUEUE_USERNAME: Authentication username (optional)
- YOUR_MESSAGE_QUEUE_PASSWORD: Authentication password (optional)
- YOUR_MESSAGE_QUEUE_SSL: Enable SSL connection (optional)
- YOUR_MESSAGE_QUEUE_TIMEOUT: Connection timeout in seconds (optional)
Returns:
Configured YourMessageQueueBackend instance
Raises:
ValueError: If required environment variables are missing
"""
connection_url = os.environ.get("YOUR_MESSAGE_QUEUE_URL")
if not connection_url:
raise ValueError("YOUR_MESSAGE_QUEUE_URL environment variable is required")
username = os.environ.get("YOUR_MESSAGE_QUEUE_USERNAME")
password = os.environ.get("YOUR_MESSAGE_QUEUE_PASSWORD")
# Optional configuration
config = {}
if ssl_enabled := os.environ.get("YOUR_MESSAGE_QUEUE_SSL"):
config["ssl"] = ssl_enabled.lower() == "true"
if timeout := os.environ.get("YOUR_MESSAGE_QUEUE_TIMEOUT"):
config["timeout"] = int(timeout)
return cls(connection_url, username, password, **config)
async def _ensure_connected(self) -> None:
"""Ensure connection to the queue service."""
if not self._connected:
await self._connect()
async def _connect(self) -> None:
"""Establish connection to YOUR-QUEUE service."""
try:
# Initialize your message queue client here
# self._client = YourMessageQueueClient(
# url=self.connection_url,
# username=self.username,
# password=self.password,
# **self.config
# )
# await self._client.connect()
self._connected = True
logger.info("Connected to YOUR-MESSAGE-QUEUE service")
except Exception as e:
logger.error(f"Failed to connect to YOUR-MESSAGE-QUEUE: {e}")
raise ConnectionError(f"Unable to connect to YOUR-MESSAGE-QUEUE: {e}")
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to YOUR-MESSAGE-QUEUE."""
await self._ensure_connected()
try:
message = json.dumps(payload)
# Implement your publish logic here
# await self._client.publish(key, message)
logger.debug(f"Published message to {key}: {payload}")
except Exception as e:
logger.error(f"Failed to publish message to {key}: {e}")
raise PublishError(f"Unable to publish message: {e}")
async def consume(self, key: str) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from YOUR-MESSAGE-QUEUE."""
await self._ensure_connected()
try:
# Implement your consume logic here
# async for message in self._client.consume(key):
# try:
# payload = json.loads(message)
# yield payload
# except json.JSONDecodeError as e:
# logger.error(f"Invalid JSON in message from {key}: {e}")
# continue
# For demonstration, yield empty dict
yield {}
except Exception as e:
logger.error(f"Failed to consume messages from {key}: {e}")
raise ConsumeError(f"Unable to consume messages: {e}")
async def close(self) -> None:
"""Close connection to YOUR-MESSAGE-QUEUE service."""
if self._client and self._connected:
await self._client.close()
self._connected = False
logger.info("Disconnected from YOUR-MESSAGE-QUEUE service")
# Custom exception classes
class PublishError(Exception):
"""Raised when message publishing fails."""
{{ ... }}
@pytest.mark.asyncio
async def test_from_env():
"""Test creating backend from environment variables."""
with patch.dict(os.environ, {
"YOUR_MESSAGE_QUEUE_URL": "test://localhost:1234",
"YOUR_MESSAGE_QUEUE_USERNAME": "test",
"YOUR_MESSAGE_QUEUE_PASSWORD": "test"
}):
from your_package.backend import YourMessageQueueBackend
backend = YourMessageQueueBackend.from_env()
assert backend.connection_url == "test://localhost:1234"
assert backend.username == "test"
{{ ... }}
async def test_publish_success(queue_backend):
{{ ... }}
with patch.object(queue_backend, '_client', new=AsyncMock()):
await queue_backend.publish("test-key", {"message": "hello"})
# Add assertions based on your implementation
@pytest.mark.asyncio
async def test_consume_messages(queue_backend):
"""Test message consumption."""
test_messages = [{"message": "hello"}, {"message": "world"}]
with patch.object(queue_backend, '_client', new=AsyncMock()) as mock_client:
mock_client.consume.return_value = iter([json.dumps(msg) for msg in test_messages])
messages = []
async for message in queue_backend.consume("test-key"):
messages.append(message)
if len(messages) >= 2:
break
assert messages == test_messages
@pytest.mark.asyncio
async def test_connection_failure(queue_backend):
"""Test handling of connection failures."""
with patch.object(queue_backend, '_connect', side_effect=ConnectionError("Connection failed")):
with pytest.raises(ConnectionError):
await queue_backend.publish("test-key", {"message": "hello"})
Next Steps
Now that you understand the plugin system:
- 📋 Requirements Setup - Install dependencies and tools
- 🛠️ Implementation - Create your project from the template
- 🔧 How to Run - Test and deploy your plugin
- 📚 API Reference - Detailed protocol documentation
Ready to start building? Let's set up your development environment!