Event Handler & Queue Architecture
This document provides comprehensive architectural coverage of the Slack MCP server's event handling system, including the queue-based architecture that decouples event reception from event processing, enabling scalable and flexible event handling patterns.
Architecture Overview
The event handling system consists of four main components that work together to provide a robust, scalable event processing pipeline:
Component Responsibilities
- Webhook Server (
slack_mcp/webhook/server.py) - Receives events from Slack and publishes them to a queue - Queue Backend (from abstract-backend library) - Stores and distributes events to consumers
- Event Consumer (
slack_mcp/webhook/event/consumer.py) - Consumes events from the queue and routes them to handlers - Event Handlers (
slack_mcp/webhook/event/handler/) - Process individual Slack events
Event Flow Architecture
The event processing flow is designed for reliability and scalability:
- Reception: Slack sends events to the webhook server endpoint (
/slack/events) - Validation: The server verifies the request signature and handles URL verification
- Publishing: Events are published to the configured queue backend with topic
slack_events - Consumption:
SlackEventConsumerreads events from the queue - Routing: Events are routed to appropriate handlers based on type and subtype
- Processing: Handler methods process the events according to your business logic
Message Queue Backend System Architecture
The queue backend abstraction layer has been migrated to the abstract-backend library. This library provides:
- Unified Protocol Interface:
MessageQueueBackendprotocol for all queue implementations - Built-in Backends: Memory, Redis, Kafka, and more
- Plugin System: Easy development and registration of custom backends
- Backend Discovery: Automatic loading via entry points
For detailed information about:
- Queue backend protocol and interface design
- Built-in backend implementations (Memory, Redis, Kafka)
- Backend selection and configuration
- Developing custom queue backend plugins
- Backend-specific environment variables
Please refer to the abstract-backend documentation
Quick Usage Example
# Import from abstract-backend library
from abe.backends.message_queue.base.protocol import MessageQueueBackend
from abe.backends.message_queue.loader import load_backend
# Load backend from environment
backend: MessageQueueBackend = load_backend()
# Publish events
await backend.publish(key="slack_events", payload=event_data)
# Consume events
async for message in backend.consume(group="my-consumer-group"):
process_message(message)
Consumer System Architecture
Base Consumer Design
The base consumer classes (AsyncLoopConsumer) have been migrated to the abstract-backend library.
For consumer implementation details, refer to the abstract-backend consumer documentation.
# Import from abstract-backend library
from abe.backends.message_queue.consumer import AsyncLoopConsumer
from abe.backends.message_queue.base.protocol import MessageQueueBackend
class CustomConsumer(AsyncLoopConsumer):
def __init__(self, backend: MessageQueueBackend):
super().__init__(backend)
async def process_message(self, message: Dict[str, Any]) -> None:
# Implement your message processing logic
print(f"Processing: {message}")
Slack Event Consumer
The SlackEventConsumer extends the base consumer for Slack-specific event handling:
consumer = SlackEventConsumer(
backend=backend,
handler=your_handler,
group="my_consumer_group" # For load balancing
)
Event Handler Architecture
Handler Protocol Design
All event handlers must implement the EventHandler protocol for consistent routing and processing.
OO-Style Handler Architecture
The BaseSlackEventHandler provides a class-based approach:
from slack_mcp.webhook.event.handler.base import BaseSlackEventHandler
class MyEventHandler(BaseSlackEventHandler):
async def on_message(self, event: Dict[str, Any]) -> None:
"""Handle message events."""
text = event.get('text', '')
channel = event.get('channel')
print(f"Channel message in {channel}: {text}")
async def on_reaction_added(self, event: Dict[str, Any]) -> None:
"""Handle reaction_added events."""
reaction = event.get('reaction', '')
print(f"Reaction added: :{reaction}:")
async def on_unknown(self, event: Dict[str, Any]) -> None:
"""Handle unknown event types."""
event_type = event.get('type', 'unknown')
print(f"Unknown event: {event_type}")
Method Naming Convention:
on_{type}()- Handle events by type (e.g.,on_message)on_{type}__{subtype}()- Handle events by type + subtype (e.g.,on_message__channels)
Decorator-Style Handler Architecture
The DecoratorHandler provides a more flexible decorator-based approach:
from slack_mcp.webhook.event.handler.decorator import DecoratorHandler
from slack_mcp.events import SlackEvent
# Create handler instance
handler = DecoratorHandler()
# Attribute-style decorators
@handler.message
async def handle_message(event: Dict[str, Any]) -> None:
text = event.get('text', '')
print(f"Message: {text}")
# Enum-style decorators
@handler(SlackEvent.APP_MENTION)
async def handle_app_mention(event: Dict[str, Any]) -> None:
text = event.get('text', '')
print(f"App mentioned: {text}")
# Subtype handling
@handler("message.channels")
async def handle_channel_message(event: Dict[str, Any]) -> None:
channel = event.get('channel')
print(f"Channel message in: {channel}")
# Wildcard handler (receives all events)
@handler
async def log_all_events(event: Dict[str, Any]) -> None:
event_type = event.get('type')
print(f"Event received: {event_type}")
Advanced Architecture Patterns
Multiple Handler Instances
Design for scalability with multiple isolated handler instances:
# Main business logic handler
business_handler = DecoratorHandler()
@business_handler.message
async def process_business_logic(event):
# Your main application logic
pass
# Analytics handler
analytics_handler = DecoratorHandler()
@analytics_handler.message
async def track_message_metrics(event):
# Track metrics and analytics
pass
# Create separate consumers with consumer groups
business_consumer = SlackEventConsumer(backend, business_handler, group="business")
analytics_consumer = SlackEventConsumer(backend, analytics_handler, group="analytics")
Error Handling Architecture
Individual Handler Error Isolation
@handler.message
async def handle_message_with_error_handling(event):
try:
# Your processing logic
risky_operation(event)
except SpecificError as e:
logger.error(f"Specific error in message handler: {e}")
# Handle gracefully
except Exception as e:
logger.exception(f"Unexpected error in message handler: {e}")
# Don't re-raise - let other handlers continue
Consumer-Level Error Handling
The SlackEventConsumer automatically catches and logs handler exceptions, ensuring that one failing handler doesn't crash the entire consumer.
Handler Priority System
For decorator-style handlers, implement processing priorities:
@handler.message(priority=0) # Called first
async def high_priority_handler(event):
# Critical processing
pass
@handler.message(priority=10) # Called later
async def low_priority_handler(event):
# Optional processing
pass
Configuration Architecture
Environment Variables
QUEUE_BACKEND: Specify the queue backend type (e.g., memory, redis, kafka)SLACK_EVENTS_TOPIC: Topic/key for Slack events in the queue (defaultslack_events)- Backend-specific variables (e.g.,
REDIS_URL,KAFKA_BOOTSTRAP_SERVERS)
Production Deployment Patterns
- Use external queue backends: Redis, Kafka, or cloud-based solutions
- Configure consumer groups: Enable load balancing across instances
- Monitor queue metrics: Track processing latency and error rates
- Implement dead letter queues: Handle permanently failed messages
Best Practices & Design Principles
Separation of Concerns
- Reception: Webhook server handles HTTP and publishes to queue
- Storage: Queue backend manages message persistence and distribution
- Processing: Event handlers focus solely on business logic
Scalability Patterns
- Horizontal scaling: Multiple consumer instances with consumer groups
- Vertical scaling: Process multiple events concurrently within consumers
- Fault tolerance: Graceful error handling and consumer restart capabilities
Testing Strategies
- Unit testing: Test individual handlers in isolation
- Integration testing: Test full event flow with memory backend
- Contract testing: Verify backend protocol compliance
- Error scenarios: Test graceful error handling and recovery
Examples Repository
Complete working examples demonstrating these architectural patterns are available in the /examples/event_handler/ directory:
decorator_handler_example.py- Comprehensive decorator-style usageslack_event_handlers.py- Multiple handler patternsadvanced_slack_handlers.py- Production-ready examples with error handling
These examples demonstrate real-world implementation of the architectural patterns described in this document.