Run abe-kafka
This guide shows how to run the abe-kafka backend locally for quick validation and testing.
1) Configure environment
- macOS/Linux (bash/zsh)
- Windows (PowerShell)
# Minimal (PLAINTEXT)
export MESSAGE_QUEUE_BACKEND=kafka
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# SASL/PLAIN
export KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
export KAFKA_SASL_MECHANISM=PLAIN
export KAFKA_SASL_USERNAME=my-user
export KAFKA_SASL_PASSWORD=my-pass
# SSL
export KAFKA_SECURITY_PROTOCOL=SSL
export KAFKA_SSL_CAFILE=/etc/ssl/ca.pem
export KAFKA_SSL_CERTFILE=/etc/ssl/cert.pem
export KAFKA_SSL_KEYFILE=/etc/ssl/key.pem
# Minimal (PLAINTEXT)
$env:MESSAGE_QUEUE_BACKEND = "kafka"
$env:KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
# SASL/PLAIN
$env:KAFKA_SECURITY_PROTOCOL = "SASL_PLAINTEXT"
$env:KAFKA_SASL_MECHANISM = "PLAIN"
$env:KAFKA_SASL_USERNAME = "my-user"
$env:KAFKA_SASL_PASSWORD = "my-pass"
# SSL
$env:KAFKA_SECURITY_PROTOCOL = "SSL"
$env:KAFKA_SSL_CAFILE = "/etc/ssl/ca.pem"
$env:KAFKA_SSL_CERTFILE = "/etc/ssl/cert.pem"
$env:KAFKA_SSL_KEYFILE = "/etc/ssl/key.pem"
2) Publish and consume sample
import asyncio
from abe_plugin.backends.message_queue.service.abe_kafka import KafkaMessageQueueBackend
async def main():
backend = KafkaMessageQueueBackend.from_env()
await backend.publish("demo-topic", {"hello": "world"})
async for msg in backend.consume(group="g1"):
print("received:", msg)
break
await backend.close()
asyncio.run(main())
Tips
- If Testcontainers returns
PLAINTEXT://host:port, the backend normalizes tohost:port. - Use
auto_offset_reset=earliestwith a fresh group to consume from offset 0. - Ensure broker
advertised.listenersis reachable from your client (localhost vs container network).
For contributors
For test suites and advanced workflows, see:
- Development workflow: ../../development/workflow.mdx