Skip to main content
Version: Next

API References

This section provides detailed documentation for implementing and extending message queue components for the Slack MCP server. The message queue system enables flexible webhook event processing through pluggable backend implementations.

Overview

The Slack MCP server supports a flexible message queue architecture that allows users to:

  • Extend existing backends: Implement custom message queue backends using the QueueBackend protocol
  • Plugin architecture: Install message queue components via pip install <slack-mcp-server-mq-plugin>
  • Environment-based configuration: Configure queue backends using environment variables
  • Seamless integration: Zero-code integration with the Slack MCP server

Architecture Design

Core Components

Plugin Discovery System

The Slack MCP server uses Python's entry points system to automatically discover and load message queue backends:

[project.entry-points."slack_mcp.backends.queue"]
your_backend_name = "your_package.module:YourBackendClass"

QueueBackend Protocol

Interface Definition

All message queue backends must implement the QueueBackend protocol:

from typing import Any, AsyncIterator, Dict, Optional, Protocol

class QueueBackend(Protocol):
"""Protocol defining the interface for message queue backends."""

@classmethod
def from_env(cls) -> "QueueBackend":
"""Create backend instance from environment variables."""
...

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue."""
...

async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue."""
...

Method Details

from_env() Class Method

Purpose: Creates a backend instance configured from environment variables.

Parameters: None

Returns: An instance of your backend class

Implementation Requirements:

  • Read configuration from environment variables
  • Validate required configuration
  • Return a properly configured instance
  • Handle missing or invalid configuration gracefully

Example:

@classmethod
def from_env(cls) -> "YourBackend":
import os

host = os.getenv("MQ_HOST", "localhost")
port = int(os.getenv("MQ_PORT", "5672"))
username = os.getenv("MQ_USERNAME")
password = os.getenv("MQ_PASSWORD")

if not username or not password:
raise ValueError("MQ_USERNAME and MQ_PASSWORD are required")

return cls(host=host, port=port, username=username, password=password)

publish() Method

Purpose: Publishes a message to the message queue.

Parameters:

  • key (str): Routing key or topic for the message
  • payload (Dict[str, Any]): Message data as a dictionary

Returns: None

Implementation Requirements:

  • Handle connection errors gracefully
  • Ensure message persistence if required
  • Support routing based on the key parameter
  • Log errors appropriately

Example:

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
try:
# Convert payload to JSON
message = json.dumps(payload)

# Publish to your message queue
await self._connection.publish(
exchange="slack_events",
routing_key=key,
body=message,
properties={"delivery_mode": 2} # Persistent
)

logger.info(f"Published message with key: {key}")
except Exception as e:
logger.error(f"Failed to publish message: {e}")
raise

consume() Method

Purpose: Consumes messages from the message queue.

Parameters:

  • group (Optional[str]): Consumer group for load balancing (if supported)

Returns: AsyncIterator[Dict[str, Any]] - An async iterator yielding message payloads

Implementation Requirements:

  • Yield messages as they arrive
  • Handle consumer group semantics if supported
  • Gracefully handle connection interruptions
  • Support cancellation via asyncio

Example:

async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
queue_name = f"slack_events_{group}" if group else "slack_events"

try:
async with self._connection.consume(queue_name) as consumer:
async for message in consumer:
try:
payload = json.loads(message.body)
yield payload
await message.ack()
except json.JSONDecodeError:
logger.error(f"Invalid JSON in message: {message.body}")
await message.nack()
except asyncio.CancelledError:
logger.info("Consumer cancelled, shutting down gracefully")
raise

Implementation Guide

Step 1: Create Your Backend Class

Create a new Python package with your backend implementation:

# your_package/backends/your_backend.py
import asyncio
import json
import logging
from typing import Any, AsyncIterator, Dict, Optional

from slack_mcp_plugin.backends.base.protocol import QueueBackend

logger = logging.getLogger(__name__)


class YourMQBackend(QueueBackend):
"""Your custom message queue backend implementation."""

def __init__(self, host: str, port: int, username: str, password: str):
self.host = host
self.port = port
self.username = username
self.password = password
self._connection = None

@classmethod
def from_env(cls) -> "YourMQBackend":
"""Create instance from environment variables."""
# Implementation here
pass

async def connect(self):
"""Establish connection to your message queue."""
# Implementation here
pass

async def disconnect(self):
"""Close connection to your message queue."""
# Implementation here
pass

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish message implementation."""
# Implementation here
pass

async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages implementation."""
# Implementation here
pass

Step 2: Configure Entry Points

Add the entry point to your pyproject.toml:

[project.entry-points."slack_mcp.backends.queue"]
your_backend = "your_package.backends.your_backend:YourMQBackend"

Step 3: Environment Configuration

Document the required environment variables for your backend:

# Your backend configuration
export QUEUE_BACKEND="your_backend"
export MQ_HOST="localhost"
export MQ_PORT="5672"
export MQ_USERNAME="user"
export MQ_PASSWORD="password"
export MQ_VHOST="/"
export MQ_EXCHANGE="slack_events"

Step 4: Error Handling Best Practices

Implement robust error handling:

import asyncio
from contextlib import asynccontextmanager

class YourMQBackend(QueueBackend):
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.disconnect()

@asynccontextmanager
async def _handle_connection_errors(self):
"""Handle connection errors with retry logic."""
max_retries = 3
retry_delay = 5

for attempt in range(max_retries):
try:
yield
break
except ConnectionError as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Connection failed, retrying in {retry_delay}s: {e}")
await asyncio.sleep(retry_delay)

Testing Your Implementation

Unit Testing

Create comprehensive tests for your backend:

import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock

from your_package.backends.your_backend import YourMQBackend

class TestYourMQBackend:
@pytest.fixture
async def backend(self):
backend = YourMQBackend(
host="localhost",
port=5672,
username="test",
password="test"
)
await backend.connect()
yield backend
await backend.disconnect()

async def test_publish(self, backend):
"""Test message publishing."""
payload = {"event": "message", "data": {"text": "Hello"}}
await backend.publish("test.key", payload)
# Add assertions based on your implementation

async def test_consume(self, backend):
"""Test message consumption."""
messages = []
async for message in backend.consume():
messages.append(message)
if len(messages) >= 1:
break

assert len(messages) == 1
# Add more assertions

Integration Testing

Test integration with the Slack MCP server:

import os
import pytest
from slack_mcp_plugin.backends import get_backend


@pytest.mark.integration
async def test_backend_integration():
"""Test backend integration with Slack MCP server."""
os.environ["QUEUE_BACKEND"] = "your_backend"
os.environ["MQ_HOST"] = "localhost"
# Set other required env vars

backend = get_backend()
assert isinstance(backend, YourMQBackend)

# Test publish/consume cycle
test_payload = {"event": "app_mention", "data": {"text": "test"}}
await backend.publish("slack.app_mention", test_payload)

async for message in backend.consume():
assert message == test_payload
break

Common Patterns and Examples

Pattern 1: Connection Pooling

For backends that support connection pooling:

class PooledMQBackend(QueueBackend):
def __init__(self, pool_size: int = 10):
self.pool_size = pool_size
self._pool = None

async def connect(self):
self._pool = await create_connection_pool(
size=self.pool_size,
host=self.host,
port=self.port
)

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
async with self._pool.acquire() as conn:
await conn.publish(key, payload)

Pattern 2: Message Serialization

Custom serialization for complex payloads:

import pickle
import base64

class SerializingMQBackend(QueueBackend):
def _serialize(self, payload: Dict[str, Any]) -> str:
"""Serialize payload to string."""
pickled = pickle.dumps(payload)
return base64.b64encode(pickled).decode('utf-8')

def _deserialize(self, data: str) -> Dict[str, Any]:
"""Deserialize string to payload."""
pickled = base64.b64decode(data.encode('utf-8'))
return pickle.loads(pickled)

Pattern 3: Dead Letter Queues

Handle failed message processing:

class DLQSupportedBackend(QueueBackend):
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
max_retries = 3

async for message in self._consume_raw():
retry_count = message.headers.get('retry_count', 0)

try:
yield self._deserialize(message.body)
await message.ack()
except Exception as e:
if retry_count >= max_retries:
# Send to dead letter queue
await self._send_to_dlq(message, str(e))
await message.ack()
else:
# Retry
await self._retry_message(message, retry_count + 1)
await message.nack()

Deployment and Distribution

Package Structure

Recommended package structure:

your-mq-backend/
├── pyproject.toml
├── README.md
├── src/
│ └── your_package/
│ ├── __init__.py
│ └── backends/
│ ├── __init__.py
│ └── your_backend.py
├── tests/
│ ├── __init__.py
│ ├── test_backend.py
│ └── test_integration.py
└── docs/
└── configuration.md

Publishing to PyPI

  1. Build your package:
python -m build
  1. Upload to PyPI:
python -m twine upload dist/*
  1. Users can then install:
pip install your-slack-mcp-backend

Troubleshooting

Common Issues

  1. Backend Not Found

    • Verify entry point configuration in pyproject.toml
    • Ensure package is installed in the same environment as Slack MCP server
  2. Connection Errors

    • Check network connectivity
    • Verify credentials and permissions
    • Implement proper retry logic
  3. Message Loss

    • Ensure proper acknowledgment handling
    • Implement message persistence if required
    • Use transactions for critical operations
  4. Performance Issues

    • Implement connection pooling
    • Use async/await properly
    • Consider message batching for high throughput

Debugging

Enable debug logging:

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("your_package.backends")

Use environment variables for debugging:

export SLACK_MCP_DEBUG=true
export QUEUE_BACKEND_DEBUG=true

Reference Implementation

See the included memory backend (src/memory.py) for a complete reference implementation that demonstrates all required patterns and best practices.