API References
This section documents the abe-kafka backend API and configuration.
Module and entry point
- Module:
abe_plugin.backends.message_queue.service.abe_kafka - Class:
KafkaMessageQueueBackend - Entry point:
[project.entry-points."abe.backends.message_queue.service"].kafka
Public API
from_env() -> KafkaMessageQueueBackend
Builds a configured backend instance from environment variables.
| Variable | Required | Type / Values | Default | Notes |
|---|---|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | Yes | Comma-separated host:port list | — | e.g. host1:9092,host2:9092 |
KAFKA_SECURITY_PROTOCOL | No | PLAINTEXT | PLAINTEXT | Also SASL_PLAINTEXT, SASL_SSL, SSL |
KAFKA_SASL_MECHANISM | No | e.g. PLAIN | — | Use with KAFKA_SASL_USERNAME/KAFKA_SASL_PASSWORD |
KAFKA_SASL_USERNAME | No | string | — | SASL username |
KAFKA_SASL_PASSWORD | No | string | — | SASL password |
KAFKA_SSL_CAFILE | No | file path | — | SSL CA file |
KAFKA_SSL_CERTFILE | No | file path | — | SSL client cert |
KAFKA_SSL_KEYFILE | No | file path | — | SSL client key |
KAFKA_CLIENT_ID | No | string | — | Passed to Kafka clients |
KAFKA_TOPIC_PATTERN | No | regex | .* | Consumer subscribes by regex |
KAFKA_CONSUMER_GROUP_PREFIX | No | string | abe | Group id <prefix>_<group> or <prefix>_default |
KAFKA_ENABLE_AUTO_COMMIT | No | true/false | true | Consumer auto commit |
KAFKA_AUTO_OFFSET_RESET | No | latest/earliest | latest | Start offset when no committed offset |
publish(key: str, payload: Dict[str, Any]) -> None
Publishes a JSON-serialised message. Delivery is confirmed by waiting on future.get(timeout=30) in a background thread. Non-serialisable payloads raise ValueError.
consume(*, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]
Subscribes via regex from KAFKA_TOPIC_PATTERN. Consumer group id is "<prefix>_<group>" or "<prefix>_default".
Yields decoded JSON payloads; invalid JSON is logged and skipped.
close() -> None
Flushes and closes the underlying Kafka producer and consumer.
Configuration mapping
See detailed mapping in the Kafka backend page: Kafka Backend
Usage example
import asyncio
from abe_plugin.backends.message_queue.service.abe_kafka import KafkaMessageQueueBackend
async def main():
b = KafkaMessageQueueBackend.from_env()
await b.publish("demo-topic", {"hello": "world"})
async for m in b.consume(group="g1"):
print(m)
break
await b.close()
async def main_wrapper():
await main()
asyncio.run(main_wrapper())
Troubleshooting
- Testcontainers may return
PLAINTEXT://host:port; abe-kafka normalises tohost:port. - If topics are not auto-created on your broker, create them or publish before consuming with
auto_offset_reset=earliest. - On macOS, ensure advertised listeners resolve from the client (IPv4/IPv6).
For contributors
For repository tests and CI workflows, see:
- Development workflow: ../../development/workflow.mdx
- CI/CD details: ../../development/ci-cd/index.mdx