Skip to main content
Version: Next

Install abe-kafka

This page shows how to install and enable the Kafka backend in your environment.

Prerequisites (summary)

  • Python 3.12 or 3.13
  • uv (recommended) for fast, reproducible environments
  • Optional: Docker (for Testcontainers or Docker Compose)
  • See full details: Requirements

1) Install the package

uv add abe-kafka

2) Enable the Kafka backend

Set the entry-point backend name and minimal configuration:

export MESSAGE_QUEUE_BACKEND=kafka
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

Optional security configuration:

# SASL/PLAIN
export KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
export KAFKA_SASL_MECHANISM=PLAIN
export KAFKA_SASL_USERNAME=my-user
export KAFKA_SASL_PASSWORD=my-pass

# SSL
export KAFKA_SECURITY_PROTOCOL=SSL
export KAFKA_SSL_CAFILE=/etc/ssl/ca.pem
export KAFKA_SSL_CERTFILE=/etc/ssl/cert.pem
export KAFKA_SSL_KEYFILE=/etc/ssl/key.pem

2.5) Start a local Kafka broker (quick options)

Choose one of the following for a local Kafka to try abe-kafka quickly.

Option A: Docker Compose (single-node KRaft, PLAINTEXT)

docker-compose.yml
services:
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- ALLOW_PLAINTEXT_LISTENER=yes
docker compose up -d
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

Stop it when done:

docker compose down -v

Notes

  • Uses PLAINTEXT for simplicity. For SASL/SSL, use a secured Compose or a managed cluster.
  • On macOS/Linux with Docker Desktop, ADVERTISED_LISTENERS=localhost:9092 works from the host.

Option B: Testcontainers (auto-start in tests)

Running the integration test will automatically start a Kafka container and verify round-trip:

uv run pytest -q -v -k test_publish_and_consume_roundtrip_with_testcontainers_threads test/integration_test/backend/queue/service/test_kafka.py

4) Quick smoke test

import asyncio
from abe_plugin.backends.message_queue.service.abe_kafka import KafkaMessageQueueBackend

async def main():
b = KafkaMessageQueueBackend.from_env()
await b.publish("demo-topic", {"hello": "world"})
async for m in b.consume(group="g1"):
print(m)
break
await b.close()

asyncio.run(main())

For contributors

If you plan to develop or contribute, see the Development section for:

  • Environment and tools: ../../development/requirements.mdx
  • Workflow and running tests: ../../development/workflow.mdx
  • CI/CD details: ../../development/ci-cd/index.mdx