Queue Backend Architecture
The Slack MCP Server provides a flexible, plugin-based queue backend architecture that allows you to integrate with various message queue systems like Redis, Kafka, RabbitMQ, and more.
Overview
The queue backend system enables:
- Event Publishing: Webhook server publishes Slack events to a queue
- Event Consumption: Custom consumers process events from the queue
- Plugin Architecture: Easy integration of external queue systems
- Type Safety: Comprehensive type definitions for all queue operations
Type System
All queue-related types are centralized in slack_mcp/types.py following PEP 695 (Python 3.12+) standards.
Core Type Aliases
QueueKey
type QueueKey = str
Represents the routing key or topic used to publish and route messages. Different backends use this differently:
- Kafka: Topic name
- Redis Streams: Stream key
- RabbitMQ: Routing key
- Memory: Simple key for routing
Examples:
from slack_mcp.types import QueueKey
# Slack events topic
key: QueueKey = "slack_events"
# Channel-specific routing
channel_id = "C1234567890"
key: QueueKey = f"slack.channel.{channel_id}"
# Event type routing
key: QueueKey = "slack.events.message"
QueuePayload
type QueuePayload = Dict[str, Any]
The core data being transmitted through the queue. Should be JSON-serializable for compatibility across different queue backends.
Examples:
from slack_mcp.types import QueuePayload
# Slack event payload
payload: QueuePayload = {
"type": "message",
"channel": "C1234567890",
"user": "U1234567890",
"text": "Hello, world!",
"ts": "1234567890.123456"
}
# Custom application payload
payload: QueuePayload = {
"event_type": "user_action",
"data": {"action": "click", "target": "button"}
}
QueueMessage
type QueueMessage = Dict[str, Any]
Complete message structure as consumed from the queue, including payload and optional metadata.
Examples:
from slack_mcp.types import QueueMessage
# Simple message (memory backend)
message: QueueMessage = {
"type": "message",
"channel": "C1234567890",
"text": "Hello"
}
# Message with metadata (Redis/Kafka backend)
message: QueueMessage = {
"payload": {
"type": "message",
"channel": "C1234567890",
"text": "Hello"
},
"metadata": {
"message_id": "msg-123",
"timestamp": 1234567890.123,
"retry_count": 0
}
}
ConsumerGroup
type ConsumerGroup = str | None
Consumer group identifier for group-based consumption patterns. Enables multiple consumers to work together with load balancing.
- None: Consumer operates independently
- str: Consumer joins the specified group
Examples:
from slack_mcp.types import ConsumerGroup
# Independent consumer
group: ConsumerGroup = None
# Consumer group for load balancing
group: ConsumerGroup = "slack-event-processors"
# Environment-specific consumer group
import os
group: ConsumerGroup = f"slack-consumers-{os.getenv('ENV', 'dev')}"
Not all queue backends support consumer groups. The memory backend ignores this parameter, while Redis Streams and Kafka use it for coordinated consumption.
QueueBackendConfig
type QueueBackendConfig = Dict[str, str | int | bool]
Configuration dictionary for queue backend initialization, typically loaded from environment variables.
Examples:
from slack_mcp.types import QueueBackendConfig
# Redis backend configuration
config: QueueBackendConfig = {
"url": "redis://localhost:6379",
"max_connections": 10,
"decode_responses": True
}
# Kafka backend configuration
config: QueueBackendConfig = {
"bootstrap_servers": "localhost:9092",
"group_id": "slack-consumers",
"auto_offset_reset": "earliest"
}
QueueBackendProtocol
The central protocol that all queue backends must implement:
from slack_mcp.types import QueueBackendProtocol
from typing import AsyncIterator
class QueueBackendProtocol(Protocol):
async def publish(self, key: QueueKey, payload: QueuePayload) -> None:
"""Publish a message to the queue."""
...
async def consume(
self,
*,
group: ConsumerGroup = None
) -> AsyncIterator[QueueMessage]:
"""Consume messages from the queue."""
yield {}
@classmethod
def from_env(cls) -> QueueBackendProtocol:
"""Create backend instance from environment variables."""
...
Plugin Development
Creating a Queue Backend Plugin
To create a new queue backend plugin (e.g., for Redis):
1. Project Structure
abe-redis/
├── pyproject.toml
├── README.md
└── slack_mcp_plugin/
└── backends/
└── queue/
├── __init__.py
└── redis.py
2. Implement the Protocol
# slack_mcp_plugin/backends/queue/redis.py
from slack_mcp.types import (
QueueBackendProtocol,
QueueKey,
QueuePayload,
QueueMessage,
ConsumerGroup,
)
from typing import AsyncIterator
import os
class RedisMessageQueueBackend:
"""Redis Streams implementation of queue backend."""
def __init__(self, redis_url: str):
self.redis_url = redis_url
# Initialize Redis client here
async def publish(self, key: QueueKey, payload: QueuePayload) -> None:
"""Publish message to Redis stream."""
# Implementation using redis-py or aioredis
pass
async def consume(
self,
*,
group: ConsumerGroup = None
) -> AsyncIterator[QueueMessage]:
"""Consume messages from Redis stream."""
while True:
# Implementation using Redis XREADGROUP or XREAD
yield {}
@classmethod
def from_env(cls) -> "RedisMessageQueueBackend":
"""Create instance from REDIS_URL environment variable."""
redis_url = os.getenv("REDIS_URL")
if not redis_url:
raise ValueError("REDIS_URL environment variable is required")
return cls(redis_url)
3. Configure Entry Points
# pyproject.toml
[project]
name = "abe-redis"
version = "0.1.0"
description = "Redis backend for Slack MCP Server"
dependencies = [
"slack-mcp>=0.0.1", # Core package provides types
"redis>=5.0.0", # Redis client
]
[project.entry-points."slack_mcp.backends.queue"]
redis = "slack_mcp_plugin.backends.queue.redis:RedisMessageQueueBackend"
4. Export from Package
# slack_mcp_plugin/backends/queue/__init__.py
from slack_mcp_plugin.backends.queue.redis import RedisMessageQueueBackend
__all__ = ["RedisMessageQueueBackend"]
Type Safety Validation
The type system ensures your plugin is compatible:
from slack_mcp import QueueBackendProtocol
from slack_mcp_plugin.backends.queue.redis import RedisMessageQueueBackend
# Type checker validates protocol compliance
backend: QueueBackendProtocol = RedisMessageQueueBackend.from_env()
# All methods are type-checked
await backend.publish(
key="slack_events", # QueueKey
payload={"type": "message"} # QueuePayload
)
async for message in backend.consume(group="processors"):
# message is typed as QueueMessage
event_type = message.get("type")
Backend Discovery
The Slack MCP Server automatically discovers and loads queue backends using Python entry points.
Selection Process
-
Explicit Selection: Use
QUEUE_BACKENDenvironment variableexport QUEUE_BACKEND=redis -
Auto-Selection: First non-memory plugin is automatically selected
-
Fallback: Memory backend (development only)
Backend Loader
from slack_mcp.backends.loader import load_backend
# Automatically loads the appropriate backend
backend = load_backend()
# Publish events
await backend.publish("slack_events", {"type": "message"})
# Consume events
async for message in backend.consume(group="my-consumers"):
print(f"Received: {message}")
Built-in Backends
Memory Backend
The default backend for development and testing:
from slack_mcp.backends.queue.memory import MemoryBackend
backend = MemoryBackend.from_env()
Characteristics:
- ✅ No external dependencies
- ✅ Fast for local development
- ❌ Messages lost on restart
- ❌ Single process only
- ❌ No consumer group support
Use Cases:
- Local development
- Unit testing
- Quick prototyping
The memory backend is not suitable for production. Use Redis, Kafka, or another persistent queue system.
Plugin Examples
Redis Backend Example
# Installation
pip install abe-redis
# Configuration
export QUEUE_BACKEND=redis
export REDIS_URL=redis://localhost:6379
# Usage (automatic)
from slack_mcp.backends.loader import load_backend
backend = load_backend() # Loads Redis backend
Kafka Backend Example
# Installation
pip install abe-kafka
# Configuration
export QUEUE_BACKEND=kafka
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# Usage (automatic)
from slack_mcp.backends.loader import load_backend
backend = load_backend() # Loads Kafka backend
Best Practices
For Plugin Developers
-
Use Core Types: Always import types from
slack_mcp.typesfrom slack_mcp.types import QueueKey, QueuePayload, QueueMessage -
Document Environment Variables: Clearly document required configuration
@classmethod
def from_env(cls) -> "MyBackend":
"""Create backend from environment.
Required Environment Variables:
MY_BACKEND_URL: Connection URL
MY_BACKEND_TIMEOUT: Connection timeout (default: 30)
""" -
Handle Errors Gracefully: Provide clear error messages
if not url:
raise ValueError(
"MY_BACKEND_URL environment variable is required. "
"Example: export MY_BACKEND_URL=protocol://host:port"
) -
Support Consumer Groups: Implement if your backend supports it
async def consume(self, *, group: ConsumerGroup = None):
if group:
# Use consumer group
pass
else:
# Independent consumer
pass -
Add Comprehensive Tests: Test protocol compliance
def test_protocol_compliance():
backend: QueueBackendProtocol = MyBackend.from_env()
assert isinstance(backend, QueueBackendProtocol)
For Plugin Users
-
Install Plugin: Use package manager
pip install abe-redis
# or
uv add abe-redis -
Configure Environment: Set required variables
export QUEUE_BACKEND=redis
export REDIS_URL=redis://localhost:6379 -
Verify Loading: Check logs for backend selection
INFO: Loading queue backend: redis
INFO: Redis backend initialized with URL: redis://localhost:6379
Type Checking
All queue backend code is fully type-checked with MyPy:
# Check your plugin implementation
mypy slack_mcp_mq_redis/
# Verify protocol compliance
mypy --strict slack_mcp_mq_redis/backend.py