Skip to main content
Version: Next

QueueBackend Protocol

The QueueBackend protocol defines the interface that all message queue backends must implement to integrate with the Slack MCP server.

Protocol Definition

from typing import Any, AsyncIterator, Dict, Optional, Protocol

class QueueBackend(Protocol):
"""Protocol defining the interface for message queue backends.

This protocol ensures that all message queue backends provide
a consistent interface for publishing and consuming messages.
"""

@classmethod
def from_env(cls) -> "QueueBackend":
"""Create backend instance from environment variables.

This method should read configuration from environment variables
and return a properly configured instance of the backend.

Returns:
QueueBackend: A configured instance of the backend

Raises:
ValueError: If required configuration is missing or invalid
ConnectionError: If unable to connect to the message queue
"""
...

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue.

Args:
key: Routing key or topic for the message. This is used to
determine where the message should be routed.
payload: Message data as a dictionary. Must be JSON serializable.

Raises:
ConnectionError: If unable to connect to the message queue
SerializationError: If payload cannot be serialized
PublishError: If message publishing fails
"""
...

async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue.

This method should return an async iterator that yields messages
as they become available. The implementation should handle
reconnection and error recovery gracefully.

Args:
group: Optional consumer group name for load balancing.
If provided, multiple consumers with the same group
name should share the message load.

Yields:
Dict[str, Any]: Message payloads as dictionaries

Raises:
ConnectionError: If unable to connect to the message queue
ConsumeError: If message consumption fails
"""
...

Implementation Requirements

Environment Configuration

All backends must support configuration via environment variables through the from_env() class method:

@classmethod
def from_env(cls) -> "YourBackend":
import os

# Read configuration with sensible defaults
host = os.getenv("MQ_HOST", "localhost")
port = int(os.getenv("MQ_PORT", "5672"))

# Validate required configuration
username = os.getenv("MQ_USERNAME")
password = os.getenv("MQ_PASSWORD")

if not username or not password:
raise ValueError("MQ_USERNAME and MQ_PASSWORD environment variables are required")

# Optional configuration
vhost = os.getenv("MQ_VHOST", "/")
exchange = os.getenv("MQ_EXCHANGE", "slack_events")

return cls(
host=host,
port=port,
username=username,
password=password,
vhost=vhost,
exchange=exchange
)

Message Publishing

The publish() method must handle:

  1. Connection Management: Establish and maintain connections
  2. Error Handling: Gracefully handle temporary failures
  3. Serialization: Convert Python dictionaries to message format
  4. Routing: Use the key parameter for message routing
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
try:
# Ensure connection is established
await self._ensure_connected()

# Serialize payload
message_body = json.dumps(payload)

# Publish with retry logic
await self._publish_with_retry(
routing_key=key,
body=message_body,
properties={
"delivery_mode": 2, # Persistent
"content_type": "application/json",
"timestamp": int(time.time())
}
)

logger.info(f"Successfully published message with key: {key}")

except Exception as e:
logger.error(f"Failed to publish message with key {key}: {e}")
raise

Message Consumption

The consume() method must:

  1. Return an AsyncIterator: Yield messages as they arrive
  2. Handle Consumer Groups: Support load balancing when group is specified
  3. Graceful Cancellation: Properly handle asyncio cancellation
  4. Error Recovery: Reconnect on connection loss
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
consumer_tag = f"consumer_{group}" if group else "consumer_default"

try:
await self._ensure_connected()

# Setup consumer with proper queue binding
queue_name = await self._setup_consumer_queue(group)

async with self._create_consumer(queue_name, consumer_tag) as consumer:
async for message in consumer:
try:
# Deserialize message
payload = json.loads(message.body)

# Yield to consumer
yield payload

# Acknowledge successful processing
await message.ack()

except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message: {e}")
await message.nack(requeue=False)

except Exception as e:
logger.error(f"Error processing message: {e}")
await message.nack(requeue=True)

except asyncio.CancelledError:
logger.info("Consumer cancelled, shutting down gracefully")
raise
except Exception as e:
logger.error(f"Consumer error: {e}")
raise

Protocol Compliance Checklist

When implementing the QueueBackend protocol, ensure your implementation:

✅ Configuration

  • Implements from_env() class method
  • Reads all configuration from environment variables
  • Validates required configuration
  • Provides sensible defaults for optional settings
  • Raises clear errors for missing/invalid configuration

✅ Publishing

  • Implements publish() method with correct signature
  • Handles connection errors gracefully
  • Supports message routing via key parameter
  • Serializes payloads to JSON
  • Implements retry logic for transient failures
  • Logs appropriate success/error messages

✅ Consuming

  • Implements consume() method returning AsyncIterator
  • Supports optional consumer groups
  • Handles asyncio cancellation properly
  • Deserializes messages from JSON
  • Implements proper message acknowledgment
  • Reconnects on connection loss

✅ Error Handling

  • Raises appropriate exceptions with clear messages
  • Logs errors at appropriate levels
  • Handles network interruptions gracefully
  • Implements connection retry logic
  • Provides meaningful error context

✅ Testing

  • Unit tests for all public methods
  • Integration tests with real message queue
  • Error condition testing
  • Performance testing under load
  • Consumer group functionality testing

Type Hints and Annotations

Your implementation should include comprehensive type hints:

from typing import Any, AsyncIterator, Dict, Optional, Union
import logging

logger: logging.Logger = logging.getLogger(__name__)

class YourMQBackend:
"""Your message queue backend implementation."""

def __init__(
self,
host: str,
port: int,
username: str,
password: str,
vhost: str = "/",
exchange: str = "slack_events"
) -> None:
self.host = host
self.port = port
self.username = username
self.password = password
self.vhost = vhost
self.exchange = exchange
self._connection: Optional[Any] = None

@classmethod
def from_env(cls) -> "YourMQBackend":
# Implementation here
...

async def connect(self) -> None:
"""Establish connection to message queue."""
# Implementation here
...

async def disconnect(self) -> None:
"""Close connection to message queue."""
# Implementation here
...

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish message to queue."""
# Implementation here
...

async def consume(
self,
*,
group: Optional[str] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from queue."""
# Implementation here
...

Common Implementation Patterns

Connection Management

async def _ensure_connected(self) -> None:
"""Ensure connection is established."""
if self._connection is None or self._connection.is_closed:
await self.connect()

async def _with_retry(self, operation, max_retries: int = 3):
"""Execute operation with retry logic."""
for attempt in range(max_retries):
try:
return await operation()
except ConnectionError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff

Message Serialization

def _serialize_payload(self, payload: Dict[str, Any]) -> bytes:
"""Serialize payload to bytes."""
return json.dumps(payload, default=str).encode('utf-8')

def _deserialize_payload(self, data: bytes) -> Dict[str, Any]:
"""Deserialize payload from bytes."""
return json.loads(data.decode('utf-8'))

Consumer Group Handling

async def _setup_consumer_queue(self, group: Optional[str]) -> str:
"""Setup queue for consumer group."""
if group:
# Create group-specific queue
queue_name = f"slack_events_{group}"
await self._declare_queue(queue_name, durable=True)
await self._bind_queue(queue_name, self.exchange, "#")
return queue_name
else:
# Use default queue
return "slack_events_default"