Skip to main content
Version: 0.0.2

Queue Backend Architecture

The Slack MCP Server provides a flexible, plugin-based queue backend architecture that allows you to integrate with various message queue systems like Redis, Kafka, RabbitMQ, and more.

Overview

The queue backend system enables:

  • Event Publishing: Webhook server publishes Slack events to a queue
  • Event Consumption: Custom consumers process events from the queue
  • Plugin Architecture: Easy integration of external queue systems
  • Type Safety: Comprehensive type definitions for all queue operations

Type System

All queue-related types are centralized in slack_mcp/types.py following PEP 695 (Python 3.12+) standards.

Core Type Aliases

QueueKey

type QueueKey = str

Represents the routing key or topic used to publish and route messages. Different backends use this differently:

  • Kafka: Topic name
  • Redis Streams: Stream key
  • RabbitMQ: Routing key
  • Memory: Simple key for routing

Examples:

from slack_mcp.types import QueueKey

# Slack events topic
key: QueueKey = "slack_events"

# Channel-specific routing
channel_id = "C1234567890"
key: QueueKey = f"slack.channel.{channel_id}"

# Event type routing
key: QueueKey = "slack.events.message"

QueuePayload

type QueuePayload = Dict[str, Any]

The core data being transmitted through the queue. Should be JSON-serializable for compatibility across different queue backends.

Examples:

from slack_mcp.types import QueuePayload

# Slack event payload
payload: QueuePayload = {
"type": "message",
"channel": "C1234567890",
"user": "U1234567890",
"text": "Hello, world!",
"ts": "1234567890.123456"
}

# Custom application payload
payload: QueuePayload = {
"event_type": "user_action",
"data": {"action": "click", "target": "button"}
}

QueueMessage

type QueueMessage = Dict[str, Any]

Complete message structure as consumed from the queue, including payload and optional metadata.

Examples:

from slack_mcp.types import QueueMessage

# Simple message (memory backend)
message: QueueMessage = {
"type": "message",
"channel": "C1234567890",
"text": "Hello"
}

# Message with metadata (Redis/Kafka backend)
message: QueueMessage = {
"payload": {
"type": "message",
"channel": "C1234567890",
"text": "Hello"
},
"metadata": {
"message_id": "msg-123",
"timestamp": 1234567890.123,
"retry_count": 0
}
}

ConsumerGroup

type ConsumerGroup = str | None

Consumer group identifier for group-based consumption patterns. Enables multiple consumers to work together with load balancing.

  • None: Consumer operates independently
  • str: Consumer joins the specified group

Examples:

from slack_mcp.types import ConsumerGroup

# Independent consumer
group: ConsumerGroup = None

# Consumer group for load balancing
group: ConsumerGroup = "slack-event-processors"

# Environment-specific consumer group
import os
group: ConsumerGroup = f"slack-consumers-{os.getenv('ENV', 'dev')}"
Backend Support

Not all queue backends support consumer groups. The memory backend ignores this parameter, while Redis Streams and Kafka use it for coordinated consumption.

QueueBackendConfig

type QueueBackendConfig = Dict[str, str | int | bool]

Configuration dictionary for queue backend initialization, typically loaded from environment variables.

Examples:

from slack_mcp.types import QueueBackendConfig

# Redis backend configuration
config: QueueBackendConfig = {
"url": "redis://localhost:6379",
"max_connections": 10,
"decode_responses": True
}

# Kafka backend configuration
config: QueueBackendConfig = {
"bootstrap_servers": "localhost:9092",
"group_id": "slack-consumers",
"auto_offset_reset": "earliest"
}

QueueBackendProtocol

The central protocol that all queue backends must implement:

from slack_mcp.types import QueueBackendProtocol
from typing import AsyncIterator

class QueueBackendProtocol(Protocol):
async def publish(self, key: QueueKey, payload: QueuePayload) -> None:
"""Publish a message to the queue."""
...

async def consume(
self,
*,
group: ConsumerGroup = None
) -> AsyncIterator[QueueMessage]:
"""Consume messages from the queue."""
yield {}

@classmethod
def from_env(cls) -> QueueBackendProtocol:
"""Create backend instance from environment variables."""
...

Plugin Development

Creating a Queue Backend Plugin

To create a new queue backend plugin (e.g., for Redis):

1. Project Structure

abe-redis/
├── pyproject.toml
├── README.md
└── slack_mcp_plugin/
└── backends/
└── queue/
├── __init__.py
└── redis.py

2. Implement the Protocol

# slack_mcp_plugin/backends/queue/redis.py
from slack_mcp.types import (
QueueBackendProtocol,
QueueKey,
QueuePayload,
QueueMessage,
ConsumerGroup,
)
from typing import AsyncIterator
import os


class RedisMessageQueueBackend:
"""Redis Streams implementation of queue backend."""

def __init__(self, redis_url: str):
self.redis_url = redis_url
# Initialize Redis client here

async def publish(self, key: QueueKey, payload: QueuePayload) -> None:
"""Publish message to Redis stream."""
# Implementation using redis-py or aioredis
pass

async def consume(
self,
*,
group: ConsumerGroup = None
) -> AsyncIterator[QueueMessage]:
"""Consume messages from Redis stream."""
while True:
# Implementation using Redis XREADGROUP or XREAD
yield {}

@classmethod
def from_env(cls) -> "RedisMessageQueueBackend":
"""Create instance from REDIS_URL environment variable."""
redis_url = os.getenv("REDIS_URL")
if not redis_url:
raise ValueError("REDIS_URL environment variable is required")
return cls(redis_url)

3. Configure Entry Points

# pyproject.toml
[project]
name = "abe-redis"
version = "0.1.0"
description = "Redis backend for Slack MCP Server"
dependencies = [
"slack-mcp>=0.0.1", # Core package provides types
"redis>=5.0.0", # Redis client
]

[project.entry-points."slack_mcp.backends.queue"]
redis = "slack_mcp_plugin.backends.queue.redis:RedisMessageQueueBackend"

4. Export from Package

# slack_mcp_plugin/backends/queue/__init__.py
from slack_mcp_plugin.backends.queue.redis import RedisMessageQueueBackend

__all__ = ["RedisMessageQueueBackend"]

Type Safety Validation

The type system ensures your plugin is compatible:

from slack_mcp import QueueBackendProtocol
from slack_mcp_plugin.backends.queue.redis import RedisMessageQueueBackend

# Type checker validates protocol compliance
backend: QueueBackendProtocol = RedisMessageQueueBackend.from_env()

# All methods are type-checked
await backend.publish(
key="slack_events", # QueueKey
payload={"type": "message"} # QueuePayload
)

async for message in backend.consume(group="processors"):
# message is typed as QueueMessage
event_type = message.get("type")

Backend Discovery

The Slack MCP Server automatically discovers and loads queue backends using Python entry points.

Selection Process

  1. Explicit Selection: Use QUEUE_BACKEND environment variable

    export QUEUE_BACKEND=redis
  2. Auto-Selection: First non-memory plugin is automatically selected

  3. Fallback: Memory backend (development only)

Backend Loader

from slack_mcp.backends.loader import load_backend

# Automatically loads the appropriate backend
backend = load_backend()

# Publish events
await backend.publish("slack_events", {"type": "message"})

# Consume events
async for message in backend.consume(group="my-consumers"):
print(f"Received: {message}")

Built-in Backends

Memory Backend

The default backend for development and testing:

from slack_mcp.backends.queue.memory import MemoryBackend

backend = MemoryBackend.from_env()

Characteristics:

  • ✅ No external dependencies
  • ✅ Fast for local development
  • ❌ Messages lost on restart
  • ❌ Single process only
  • ❌ No consumer group support

Use Cases:

  • Local development
  • Unit testing
  • Quick prototyping
Production Use

The memory backend is not suitable for production. Use Redis, Kafka, or another persistent queue system.

Plugin Examples

Redis Backend Example

# Installation
pip install abe-redis

# Configuration
export QUEUE_BACKEND=redis
export REDIS_URL=redis://localhost:6379

# Usage (automatic)
from slack_mcp.backends.loader import load_backend
backend = load_backend() # Loads Redis backend

Kafka Backend Example

# Installation
pip install abe-kafka

# Configuration
export QUEUE_BACKEND=kafka
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# Usage (automatic)
from slack_mcp.backends.loader import load_backend
backend = load_backend() # Loads Kafka backend

Best Practices

For Plugin Developers

  1. Use Core Types: Always import types from slack_mcp.types

    from slack_mcp.types import QueueKey, QueuePayload, QueueMessage
  2. Document Environment Variables: Clearly document required configuration

    @classmethod
    def from_env(cls) -> "MyBackend":
    """Create backend from environment.

    Required Environment Variables:
    MY_BACKEND_URL: Connection URL
    MY_BACKEND_TIMEOUT: Connection timeout (default: 30)
    """
  3. Handle Errors Gracefully: Provide clear error messages

    if not url:
    raise ValueError(
    "MY_BACKEND_URL environment variable is required. "
    "Example: export MY_BACKEND_URL=protocol://host:port"
    )
  4. Support Consumer Groups: Implement if your backend supports it

    async def consume(self, *, group: ConsumerGroup = None):
    if group:
    # Use consumer group
    pass
    else:
    # Independent consumer
    pass
  5. Add Comprehensive Tests: Test protocol compliance

    def test_protocol_compliance():
    backend: QueueBackendProtocol = MyBackend.from_env()
    assert isinstance(backend, QueueBackendProtocol)

For Plugin Users

  1. Install Plugin: Use package manager

    pip install abe-redis
    # or
    uv add abe-redis
  2. Configure Environment: Set required variables

    export QUEUE_BACKEND=redis
    export REDIS_URL=redis://localhost:6379
  3. Verify Loading: Check logs for backend selection

    INFO: Loading queue backend: redis
    INFO: Redis backend initialized with URL: redis://localhost:6379

Type Checking

All queue backend code is fully type-checked with MyPy:

# Check your plugin implementation
mypy slack_mcp_mq_redis/

# Verify protocol compliance
mypy --strict slack_mcp_mq_redis/backend.py

References