API References
This section provides detailed documentation for implementing and extending message queue components for the Slack MCP server. The message queue system enables flexible webhook event processing through pluggable backend implementations.
Overview
The Slack MCP server supports a flexible message queue architecture that allows users to:
- Extend existing backends: Implement custom message queue backends using the
QueueBackend
protocol - Plugin architecture: Install message queue components via
pip install <slack-mcp-server-mq-plugin>
- Environment-based configuration: Configure queue backends using environment variables
- Seamless integration: Zero-code integration with the Slack MCP server
Architecture Design
Core Components
Plugin Discovery System
The Slack MCP server uses Python's entry points system to automatically discover and load message queue backends:
[project.entry-points."slack_mcp.backends.queue"]
your_backend_name = "your_package.module:YourBackendClass"
QueueBackend Protocol
Interface Definition
All message queue backends must implement the QueueBackend
protocol:
from typing import Any, AsyncIterator, Dict, Optional, Protocol
class QueueBackend(Protocol):
"""Protocol defining the interface for message queue backends."""
@classmethod
def from_env(cls) -> "QueueBackend":
"""Create backend instance from environment variables."""
...
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue."""
...
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue."""
...
Method Details
from_env()
Class Method
Purpose: Creates a backend instance configured from environment variables.
Parameters: None
Returns: An instance of your backend class
Implementation Requirements:
- Read configuration from environment variables
- Validate required configuration
- Return a properly configured instance
- Handle missing or invalid configuration gracefully
Example:
@classmethod
def from_env(cls) -> "YourBackend":
import os
host = os.getenv("MQ_HOST", "localhost")
port = int(os.getenv("MQ_PORT", "5672"))
username = os.getenv("MQ_USERNAME")
password = os.getenv("MQ_PASSWORD")
if not username or not password:
raise ValueError("MQ_USERNAME and MQ_PASSWORD are required")
return cls(host=host, port=port, username=username, password=password)
publish()
Method
Purpose: Publishes a message to the message queue.
Parameters:
key
(str): Routing key or topic for the messagepayload
(Dict[str, Any]): Message data as a dictionary
Returns: None
Implementation Requirements:
- Handle connection errors gracefully
- Ensure message persistence if required
- Support routing based on the key parameter
- Log errors appropriately
Example:
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
try:
# Convert payload to JSON
message = json.dumps(payload)
# Publish to your message queue
await self._connection.publish(
exchange="slack_events",
routing_key=key,
body=message,
properties={"delivery_mode": 2} # Persistent
)
logger.info(f"Published message with key: {key}")
except Exception as e:
logger.error(f"Failed to publish message: {e}")
raise
consume()
Method
Purpose: Consumes messages from the message queue.
Parameters:
group
(Optional[str]): Consumer group for load balancing (if supported)
Returns: AsyncIterator[Dict[str, Any]] - An async iterator yielding message payloads
Implementation Requirements:
- Yield messages as they arrive
- Handle consumer group semantics if supported
- Gracefully handle connection interruptions
- Support cancellation via asyncio
Example:
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
queue_name = f"slack_events_{group}" if group else "slack_events"
try:
async with self._connection.consume(queue_name) as consumer:
async for message in consumer:
try:
payload = json.loads(message.body)
yield payload
await message.ack()
except json.JSONDecodeError:
logger.error(f"Invalid JSON in message: {message.body}")
await message.nack()
except asyncio.CancelledError:
logger.info("Consumer cancelled, shutting down gracefully")
raise
Implementation Guide
Step 1: Create Your Backend Class
Create a new Python package with your backend implementation:
# your_package/backends/your_backend.py
import asyncio
import json
import logging
from typing import Any, AsyncIterator, Dict, Optional
from slack_mcp_plugin.backends.base.protocol import QueueBackend
logger = logging.getLogger(__name__)
class YourMQBackend(QueueBackend):
"""Your custom message queue backend implementation."""
def __init__(self, host: str, port: int, username: str, password: str):
self.host = host
self.port = port
self.username = username
self.password = password
self._connection = None
@classmethod
def from_env(cls) -> "YourMQBackend":
"""Create instance from environment variables."""
# Implementation here
pass
async def connect(self):
"""Establish connection to your message queue."""
# Implementation here
pass
async def disconnect(self):
"""Close connection to your message queue."""
# Implementation here
pass
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish message implementation."""
# Implementation here
pass
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages implementation."""
# Implementation here
pass
Step 2: Configure Entry Points
Add the entry point to your pyproject.toml
:
[project.entry-points."slack_mcp.backends.queue"]
your_backend = "your_package.backends.your_backend:YourMQBackend"
Step 3: Environment Configuration
Document the required environment variables for your backend:
# Your backend configuration
export QUEUE_BACKEND="your_backend"
export MQ_HOST="localhost"
export MQ_PORT="5672"
export MQ_USERNAME="user"
export MQ_PASSWORD="password"
export MQ_VHOST="/"
export MQ_EXCHANGE="slack_events"
Step 4: Error Handling Best Practices
Implement robust error handling:
import asyncio
from contextlib import asynccontextmanager
class YourMQBackend(QueueBackend):
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.disconnect()
@asynccontextmanager
async def _handle_connection_errors(self):
"""Handle connection errors with retry logic."""
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
yield
break
except ConnectionError as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Connection failed, retrying in {retry_delay}s: {e}")
await asyncio.sleep(retry_delay)
Testing Your Implementation
Unit Testing
Create comprehensive tests for your backend:
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock
from your_package.backends.your_backend import YourMQBackend
class TestYourMQBackend:
@pytest.fixture
async def backend(self):
backend = YourMQBackend(
host="localhost",
port=5672,
username="test",
password="test"
)
await backend.connect()
yield backend
await backend.disconnect()
async def test_publish(self, backend):
"""Test message publishing."""
payload = {"event": "message", "data": {"text": "Hello"}}
await backend.publish("test.key", payload)
# Add assertions based on your implementation
async def test_consume(self, backend):
"""Test message consumption."""
messages = []
async for message in backend.consume():
messages.append(message)
if len(messages) >= 1:
break
assert len(messages) == 1
# Add more assertions
Integration Testing
Test integration with the Slack MCP server:
import os
import pytest
from slack_mcp_plugin.backends import get_backend
@pytest.mark.integration
async def test_backend_integration():
"""Test backend integration with Slack MCP server."""
os.environ["QUEUE_BACKEND"] = "your_backend"
os.environ["MQ_HOST"] = "localhost"
# Set other required env vars
backend = get_backend()
assert isinstance(backend, YourMQBackend)
# Test publish/consume cycle
test_payload = {"event": "app_mention", "data": {"text": "test"}}
await backend.publish("slack.app_mention", test_payload)
async for message in backend.consume():
assert message == test_payload
break
Common Patterns and Examples
Pattern 1: Connection Pooling
For backends that support connection pooling:
class PooledMQBackend(QueueBackend):
def __init__(self, pool_size: int = 10):
self.pool_size = pool_size
self._pool = None
async def connect(self):
self._pool = await create_connection_pool(
size=self.pool_size,
host=self.host,
port=self.port
)
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
async with self._pool.acquire() as conn:
await conn.publish(key, payload)
Pattern 2: Message Serialization
Custom serialization for complex payloads:
import pickle
import base64
class SerializingMQBackend(QueueBackend):
def _serialize(self, payload: Dict[str, Any]) -> str:
"""Serialize payload to string."""
pickled = pickle.dumps(payload)
return base64.b64encode(pickled).decode('utf-8')
def _deserialize(self, data: str) -> Dict[str, Any]:
"""Deserialize string to payload."""
pickled = base64.b64decode(data.encode('utf-8'))
return pickle.loads(pickled)
Pattern 3: Dead Letter Queues
Handle failed message processing:
class DLQSupportedBackend(QueueBackend):
async def consume(self, *, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]:
max_retries = 3
async for message in self._consume_raw():
retry_count = message.headers.get('retry_count', 0)
try:
yield self._deserialize(message.body)
await message.ack()
except Exception as e:
if retry_count >= max_retries:
# Send to dead letter queue
await self._send_to_dlq(message, str(e))
await message.ack()
else:
# Retry
await self._retry_message(message, retry_count + 1)
await message.nack()
Deployment and Distribution
Package Structure
Recommended package structure:
your-mq-backend/
├── pyproject.toml
├── README.md
├── src/
│ └── your_package/
│ ├── __init__.py
│ └── backends/
│ ├── __init__.py
│ └── your_backend.py
├── tests/
│ ├── __init__.py
│ ├── test_backend.py
│ └── test_integration.py
└── docs/
└── configuration.md
Publishing to PyPI
- Build your package:
python -m build
- Upload to PyPI:
python -m twine upload dist/*
- Users can then install:
pip install your-slack-mcp-backend
Troubleshooting
Common Issues
-
Backend Not Found
- Verify entry point configuration in
pyproject.toml
- Ensure package is installed in the same environment as Slack MCP server
- Verify entry point configuration in
-
Connection Errors
- Check network connectivity
- Verify credentials and permissions
- Implement proper retry logic
-
Message Loss
- Ensure proper acknowledgment handling
- Implement message persistence if required
- Use transactions for critical operations
-
Performance Issues
- Implement connection pooling
- Use async/await properly
- Consider message batching for high throughput
Debugging
Enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("your_package.backends")
Use environment variables for debugging:
export SLACK_MCP_DEBUG=true
export QUEUE_BACKEND_DEBUG=true
Reference Implementation
See the included memory backend (src/memory.py
) for a complete reference implementation that demonstrates all required patterns and best practices.