Skip to main content
Version: Next

Kafka Message Queue Backend for Abstract Backend

Welcome to abe-kafka — a Kafka-based message queue component that integrates with the abstract-backend project.

What is abe-kafka?

This project provides a production-ready Kafka-backed message queue implementation that integrates seamlessly with the abstract-backend event processing system. It enables developers to quickly deliver queue-enabled services as distributable Python packages that can be installed and reused without additional integration friction.

Why abe-kafka

abe-kafka is a focused Kafka backend for Abstract Backend:

  • Provides a Kafka-powered MessageQueueBackend via kafka-python.
  • Simple env-based configuration with KafkaMessageQueueBackend.from_env().
  • JSON serialization/deserialization with robust error handling.
  • Production-ready tests: unit, Testcontainers, and SASL/SSL against real clusters.

abe-kafka at a glance

  • Implements MessageQueueBackend using kafka-python.
  • JSON payloads; producer serializes, consumer deserializes and skips invalid JSON.
  • Environment-driven config with from_env().
  • Consumer subscribes via regex pattern (topic routing) and auto-prefixed group ids.
  • Safe async integration: blocking Kafka calls are run in threads via asyncio.to_thread.

Install and enable

pip install abe-kafka

# Select the kafka backend via entry point
export MESSAGE_QUEUE_BACKEND=kafka

# Minimal config
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

Configuration (env vars)

VariableRequiredType / ValuesDefaultNotes
KAFKA_BOOTSTRAP_SERVERSYesComma-separated host:port liste.g. host1:9092,host2:9092
KAFKA_SECURITY_PROTOCOLNoPLAINTEXTPLAINTEXTAlso supports SASL_PLAINTEXT, SASL_SSL, SSL
KAFKA_SASL_MECHANISMNoe.g. PLAINSet with KAFKA_SASL_USERNAME/KAFKA_SASL_PASSWORD
KAFKA_SASL_USERNAMENostringSASL username
KAFKA_SASL_PASSWORDNostringSASL password
KAFKA_SSL_CAFILENofile pathSSL CA file
KAFKA_SSL_CERTFILENofile pathSSL client cert
KAFKA_SSL_KEYFILENofile pathSSL client key
KAFKA_CLIENT_IDNostringPassed to Kafka clients
KAFKA_TOPIC_PATTERNNoregex.*Consumer subscribes by regex
KAFKA_CONSUMER_GROUP_PREFIXNostringabeGroup id becomes <prefix>_<group> or <prefix>_default
KAFKA_ENABLE_AUTO_COMMITNotrue/falsetrueConsumer auto commit
KAFKA_AUTO_OFFSET_RESETNolatest/earliestlatestConsumer start offset when no committed offset

Quick usage example

import asyncio
from abe_plugin.backends.message_queue.service.abe_kafka import KafkaMessageQueueBackend

async def main():
backend = KafkaMessageQueueBackend.from_env()
await backend.publish("demo-topic", {"hello": "world"})
async for msg in backend.consume(group="g1"):
print("received:", msg)
break
await backend.close()

asyncio.run(main())

Implementation details

  • Class: KafkaMessageQueueBackend
    • Producer: kafka.KafkaProducer with JSON serializer
    • Consumer: kafka.KafkaConsumer with subscribe(pattern=...)
    • publish() confirms delivery by awaiting future.get(timeout=30) in a thread
    • consume() polls in a thread and yields decoded JSON payloads
    • _with_retry[T] provides bounded exponential backoff for transient failures

For contributors

If you plan to develop or contribute, see the Development section for testing, CI, and coding standards:

  • Development overview: ../../development/index.mdx
  • Workflow and running tests: ../../development/workflow.mdx
  • CI/CD details: ../../development/ci-cd/index.mdx

Compatibility

  • Python: 3.12, 3.13
  • kafka-python: >=2.0,<3

Troubleshooting

  • Testcontainers may return PLAINTEXT://host:port; we normalize to host:port.
  • If topic auto-create is disabled, pre-create topics or use auto_offset_reset=earliest with a new group.
  • On macOS, ensure broker advertised.listeners matches your client address (e.g., localhost vs IPv6).