Kafka Message Queue Backend for Abstract Backend
Welcome to abe-kafka — a Kafka-based message queue component that integrates with the abstract-backend project.
What is abe-kafka?
This project provides a production-ready Kafka-backed message queue implementation that integrates seamlessly with the abstract-backend event processing system. It enables developers to quickly deliver queue-enabled services as distributable Python packages that can be installed and reused without additional integration friction.
Why abe-kafka
abe-kafka is a focused Kafka backend for Abstract Backend:
- Provides a Kafka-powered
MessageQueueBackendviakafka-python. - Simple env-based configuration with
KafkaMessageQueueBackend.from_env(). - JSON serialization/deserialization with robust error handling.
- Production-ready tests: unit, Testcontainers, and SASL/SSL against real clusters.
abe-kafka at a glance
- Implements
MessageQueueBackendusingkafka-python. - JSON payloads; producer serializes, consumer deserializes and skips invalid JSON.
- Environment-driven config with
from_env(). - Consumer subscribes via regex pattern (topic routing) and auto-prefixed group ids.
- Safe async integration: blocking Kafka calls are run in threads via
asyncio.to_thread.
Install and enable
pip install abe-kafka
# Select the kafka backend via entry point
export MESSAGE_QUEUE_BACKEND=kafka
# Minimal config
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
Configuration (env vars)
| Variable | Required | Type / Values | Default | Notes |
|---|---|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | Yes | Comma-separated host:port list | — | e.g. host1:9092,host2:9092 |
KAFKA_SECURITY_PROTOCOL | No | PLAINTEXT | PLAINTEXT | Also supports SASL_PLAINTEXT, SASL_SSL, SSL |
KAFKA_SASL_MECHANISM | No | e.g. PLAIN | — | Set with KAFKA_SASL_USERNAME/KAFKA_SASL_PASSWORD |
KAFKA_SASL_USERNAME | No | string | — | SASL username |
KAFKA_SASL_PASSWORD | No | string | — | SASL password |
KAFKA_SSL_CAFILE | No | file path | — | SSL CA file |
KAFKA_SSL_CERTFILE | No | file path | — | SSL client cert |
KAFKA_SSL_KEYFILE | No | file path | — | SSL client key |
KAFKA_CLIENT_ID | No | string | — | Passed to Kafka clients |
KAFKA_TOPIC_PATTERN | No | regex | .* | Consumer subscribes by regex |
KAFKA_CONSUMER_GROUP_PREFIX | No | string | abe | Group id becomes <prefix>_<group> or <prefix>_default |
KAFKA_ENABLE_AUTO_COMMIT | No | true/false | true | Consumer auto commit |
KAFKA_AUTO_OFFSET_RESET | No | latest/earliest | latest | Consumer start offset when no committed offset |
Quick usage example
import asyncio
from abe_plugin.backends.message_queue.service.abe_kafka import KafkaMessageQueueBackend
async def main():
backend = KafkaMessageQueueBackend.from_env()
await backend.publish("demo-topic", {"hello": "world"})
async for msg in backend.consume(group="g1"):
print("received:", msg)
break
await backend.close()
asyncio.run(main())
Implementation details
- Class:
KafkaMessageQueueBackend- Producer:
kafka.KafkaProducerwith JSON serializer - Consumer:
kafka.KafkaConsumerwithsubscribe(pattern=...) publish()confirms delivery by awaitingfuture.get(timeout=30)in a threadconsume()polls in a thread and yields decoded JSON payloads_with_retry[T]provides bounded exponential backoff for transient failures
- Producer:
For contributors
If you plan to develop or contribute, see the Development section for testing, CI, and coding standards:
- Development overview: ../../development/index.mdx
- Workflow and running tests: ../../development/workflow.mdx
- CI/CD details: ../../development/ci-cd/index.mdx
Compatibility
- Python: 3.12, 3.13
- kafka-python:
>=2.0,<3
Troubleshooting
- Testcontainers may return
PLAINTEXT://host:port; we normalize tohost:port. - If topic auto-create is disabled, pre-create topics or use
auto_offset_reset=earliestwith a new group. - On macOS, ensure broker
advertised.listenersmatches your client address (e.g., localhost vs IPv6).