Skip to main content
Version: Next

Getting Started with Slack MCP Server Message Queue Plugin Development

This guide will walk you through creating your own message queue backend plugin for the Slack MCP Server. In just a few steps, you'll have a working plugin that can be distributed and used by others in the Slack MCP ecosystem.

What You'll Build

By following this guide, you'll create:

Custom Queue Backend: A message queue implementation that follows the QueueBackend protocol
Distributable Package: A Python package that can be installed via pip install
Plugin Integration: Seamless integration with existing Slack MCP Server installations
Environment Configuration: Support for configuration through environment variables
Production Ready: Complete with tests, documentation, and CI/CD workflows

Understanding the Plugin System

The Slack MCP Server uses a sophisticated plugin-based architecture for message queue backends. Here's how it works:

Plugin Discovery Mechanism

The Slack MCP Server discovers plugins using Python's entry points system:

# In your pyproject.toml
[project.entry-points."slack_mcp.backends.queue"]
redis = "slack_mcp_redis.backend:RedisQueueBackend"
rabbitmq = "slack_mcp_rabbitmq.backend:RabbitMQBackend"
memory = "slack_mcp_memory.backend:MemoryQueueBackend"
your_plugin = "your_package.module:YourQueueBackend"

Plugin Lifecycle

  1. Discovery: MCP Server scans for plugins with the slack_mcp.backends.queue entry point
  2. Selection: Based on QUEUE_BACKEND environment variable
  3. Instantiation: Calls YourQueueBackend.from_env() to create the instance
  4. Operation: Uses publish() and consume() methods for message handling

Queue Backend Protocol

Your plugin must implement the QueueBackend protocol:

from abc import ABC, abstractmethod
from typing import Any, AsyncIterator, Dict, Optional

class QueueBackend(ABC):
"""Protocol for message queue backend implementations."""

@classmethod
@abstractmethod
def from_env(cls) -> "QueueBackend":
"""Create instance from environment variables.

This method should read configuration from environment variables
and return a configured instance of your queue backend.

Returns:
Configured queue backend instance

Raises:
ValueError: If required environment variables are missing
ConnectionError: If unable to connect to the queue service
"""
pass

@abstractmethod
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to the queue.

Args:
key: Queue key/topic/channel to publish to
payload: Message data to publish

Raises:
PublishError: If message cannot be published
ConnectionError: If connection to queue is lost
"""
pass

@abstractmethod
async def consume(self, key: str) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from the queue.

Args:
key: Queue key/topic/channel to consume from

Yields:
Message data from the queue

Raises:
ConsumeError: If unable to consume messages
ConnectionError: If connection to queue is lost
"""
pass

Implementation Patterns

1. Basic Queue Backend Structure

Here's a template for implementing your queue backend:

"""
Your Queue Backend Implementation

This module provides a YOUR-QUEUE implementation of the QueueBackend protocol
for the Slack MCP Server.
"""

import asyncio
import json
import logging
import os
from typing import Any, AsyncIterator, Dict, Optional
from slack_mcp_plugin.backends.base.protocol import QueueBackend

logger = logging.getLogger(__name__)


class YourQueueBackend(QueueBackend):
"""YOUR-QUEUE implementation of QueueBackend."""

def __init__(
self,
connection_url: str,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs
):
"""Initialize the YOUR-QUEUE backend.

Args:
connection_url: Connection string for YOUR-QUEUE service
username: Authentication username (optional)
password: Authentication password (optional)
**kwargs: Additional configuration options
"""
self.connection_url = connection_url
self.username = username
self.password = password
self.config = kwargs
self._client = None
self._connected = False

@classmethod
def from_env(cls) -> "YourQueueBackend":
"""Create instance from environment variables.

Expected environment variables:
- YOUR_QUEUE_URL: Connection URL for YOUR-QUEUE service
- YOUR_QUEUE_USERNAME: Authentication username (optional)
- YOUR_QUEUE_PASSWORD: Authentication password (optional)
- YOUR_QUEUE_SSL: Enable SSL connection (optional)
- YOUR_QUEUE_TIMEOUT: Connection timeout in seconds (optional)

Returns:
Configured YourQueueBackend instance

Raises:
ValueError: If required environment variables are missing
"""
connection_url = os.environ.get("YOUR_QUEUE_URL")
if not connection_url:
raise ValueError("YOUR_QUEUE_URL environment variable is required")

username = os.environ.get("YOUR_QUEUE_USERNAME")
password = os.environ.get("YOUR_QUEUE_PASSWORD")

# Optional configuration
config = {}
if ssl_enabled := os.environ.get("YOUR_QUEUE_SSL"):
config["ssl"] = ssl_enabled.lower() == "true"
if timeout := os.environ.get("YOUR_QUEUE_TIMEOUT"):
config["timeout"] = int(timeout)

return cls(connection_url, username, password, **config)

async def _ensure_connected(self) -> None:
"""Ensure connection to the queue service."""
if not self._connected:
await self._connect()

async def _connect(self) -> None:
"""Establish connection to YOUR-QUEUE service."""
try:
# Initialize your queue client here
# self._client = YourQueueClient(
# url=self.connection_url,
# username=self.username,
# password=self.password,
# **self.config
# )
# await self._client.connect()
self._connected = True
logger.info("Connected to YOUR-QUEUE service")
except Exception as e:
logger.error(f"Failed to connect to YOUR-QUEUE: {e}")
raise ConnectionError(f"Unable to connect to YOUR-QUEUE: {e}")

async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish a message to YOUR-QUEUE."""
await self._ensure_connected()

try:
message = json.dumps(payload)
# Implement your publish logic here
# await self._client.publish(key, message)
logger.debug(f"Published message to {key}: {payload}")
except Exception as e:
logger.error(f"Failed to publish message to {key}: {e}")
raise PublishError(f"Unable to publish message: {e}")

async def consume(self, key: str) -> AsyncIterator[Dict[str, Any]]:
"""Consume messages from YOUR-QUEUE."""
await self._ensure_connected()

try:
# Implement your consume logic here
# async for message in self._client.consume(key):
# try:
# payload = json.loads(message)
# yield payload
# except json.JSONDecodeError as e:
# logger.error(f"Invalid JSON in message from {key}: {e}")
# continue

# For demonstration, yield empty dict
yield {}
except Exception as e:
logger.error(f"Failed to consume messages from {key}: {e}")
raise ConsumeError(f"Unable to consume messages: {e}")

async def close(self) -> None:
"""Close connection to YOUR-QUEUE service."""
if self._client and self._connected:
await self._client.close()
self._connected = False
logger.info("Disconnected from YOUR-QUEUE service")


# Custom exception classes
class PublishError(Exception):
"""Raised when message publishing fails."""
pass


class ConsumeError(Exception):
"""Raised when message consumption fails."""
pass

2. Environment Configuration Best Practices

Create a comprehensive configuration system:

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class QueueConfig:
"""Configuration for YOUR-QUEUE backend."""

# Required settings
url: str

# Authentication
username: Optional[str] = None
password: Optional[str] = None

# Connection settings
ssl_enabled: bool = False
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0

# Queue-specific settings
queue_durable: bool = True
message_ttl: Optional[int] = None
max_length: Optional[int] = None

@classmethod
def from_env(cls) -> "QueueConfig":
"""Load configuration from environment variables."""
return cls(
url=os.environ.get("YOUR_QUEUE_URL", ""),
username=os.environ.get("YOUR_QUEUE_USERNAME"),
password=os.environ.get("YOUR_QUEUE_PASSWORD"),
ssl_enabled=os.environ.get("YOUR_QUEUE_SSL", "false").lower() == "true",
timeout=int(os.environ.get("YOUR_QUEUE_TIMEOUT", "30")),
max_retries=int(os.environ.get("YOUR_QUEUE_MAX_RETRIES", "3")),
retry_delay=float(os.environ.get("YOUR_QUEUE_RETRY_DELAY", "1.0")),
queue_durable=os.environ.get("YOUR_QUEUE_DURABLE", "true").lower() == "true",
message_ttl=int(ttl) if (ttl := os.environ.get("YOUR_QUEUE_MESSAGE_TTL")) else None,
max_length=int(length) if (length := os.environ.get("YOUR_QUEUE_MAX_LENGTH")) else None,
)

def validate(self) -> None:
"""Validate configuration settings."""
if not self.url:
raise ValueError("YOUR_QUEUE_URL is required")

if self.timeout <= 0:
raise ValueError("Timeout must be positive")

if self.max_retries < 0:
raise ValueError("Max retries cannot be negative")

3. Error Handling and Resilience

Implement robust error handling:

import asyncio
from functools import wraps
from typing import Callable, TypeVar

T = TypeVar('T')

def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception = None

for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
last_exception = e
if attempt < max_retries:
logger.warning(
f"Attempt {attempt + 1} failed for {func.__name__}: {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
else:
logger.error(f"All {max_retries + 1} attempts failed for {func.__name__}")
raise last_exception

raise last_exception
return wrapper
return decorator

class YourQueueBackend(QueueBackend):
# ...existing code...

@retry_on_failure(max_retries=3, delay=1.0)
async def publish(self, key: str, payload: Dict[str, Any]) -> None:
"""Publish with automatic retry on failure."""
# Implementation with retry logic
pass

@retry_on_failure(max_retries=3, delay=1.0)
async def _reconnect(self) -> None:
"""Reconnect to the queue service."""
await self._connect()

Integration Examples

Setting Up Your Plugin

Users will integrate your plugin like this:

# Install your plugin
pip install slack-mcp-your-queue-backend

# Configure environment
export QUEUE_BACKEND=your_queue_name
export YOUR_QUEUE_URL=your-queue://localhost:1234
export YOUR_QUEUE_USERNAME=user
export YOUR_QUEUE_PASSWORD=pass

# Run Slack MCP Server
python -m slack_mcp_server

Docker Compose Example

Provide a complete Docker setup:

# docker-compose.yml
version: '3.8'

services:
slack-mcp-server:
image: slack-mcp-server:latest
environment:
- QUEUE_BACKEND=your_queue_name
- YOUR_QUEUE_URL=your-queue://your-queue-service:1234
- YOUR_QUEUE_USERNAME=user
- YOUR_QUEUE_PASSWORD=pass
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- SLACK_SIGNING_SECRET=${SLACK_SIGNING_SECRET}
depends_on:
- your-queue-service
ports:
- "8000:8000"

your-queue-service:
image: your-queue:latest
environment:
- YOUR_QUEUE_CONFIG=value
ports:
- "1234:1234"
volumes:
- queue_data:/data

volumes:
queue_data:

Testing Your Implementation

Unit Tests

Create comprehensive tests:

import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from your_package.backend import YourQueueBackend

@pytest.fixture
def queue_backend():
"""Create a test queue backend instance."""
return YourQueueBackend(
connection_url="test://localhost:1234",
username="test",
password="test"
)

@pytest.mark.asyncio
async def test_from_env():
"""Test creating backend from environment variables."""
with patch.dict(os.environ, {
"YOUR_QUEUE_URL": "test://localhost:1234",
"YOUR_QUEUE_USERNAME": "test",
"YOUR_QUEUE_PASSWORD": "test"
}):
backend = YourQueueBackend.from_env()
assert backend.connection_url == "test://localhost:1234"
assert backend.username == "test"

@pytest.mark.asyncio
async def test_publish_success(queue_backend):
"""Test successful message publishing."""
with patch.object(queue_backend, '_client', new=AsyncMock()):
await queue_backend.publish("test-key", {"message": "hello"})
# Add assertions based on your implementation

@pytest.mark.asyncio
async def test_consume_messages(queue_backend):
"""Test message consumption."""
test_messages = [{"message": "hello"}, {"message": "world"}]

with patch.object(queue_backend, '_client', new=AsyncMock()) as mock_client:
mock_client.consume.return_value = iter([json.dumps(msg) for msg in test_messages])

messages = []
async for message in queue_backend.consume("test-key"):
messages.append(message)
if len(messages) >= 2:
break

assert messages == test_messages

@pytest.mark.asyncio
async def test_connection_failure(queue_backend):
"""Test handling of connection failures."""
with patch.object(queue_backend, '_connect', side_effect=ConnectionError("Connection failed")):
with pytest.raises(ConnectionError):
await queue_backend.publish("test-key", {"message": "hello"})

Next Steps

Now that you understand the plugin system:

  1. 📋 Requirements Setup - Install dependencies and tools
  2. 🛠️ Implementation - Create your project from the template
  3. 🔧 How to Run - Test and deploy your plugin
  4. 📚 API Reference - Detailed protocol documentation

Ready to start building? Let's set up your development environment!