Skip to main content
Version: Next

API References

This section documents the abe-kafka backend API and configuration.

Module and entry point

  • Module: abe_plugin.backends.message_queue.service.abe_kafka
  • Class: KafkaMessageQueueBackend
  • Entry point: [project.entry-points."abe.backends.message_queue.service"].kafka

Public API

from_env() -> KafkaMessageQueueBackend

Builds a configured backend instance from environment variables.

VariableRequiredType / ValuesDefaultNotes
KAFKA_BOOTSTRAP_SERVERSYesComma-separated host:port liste.g. host1:9092,host2:9092
KAFKA_SECURITY_PROTOCOLNoPLAINTEXTPLAINTEXTAlso SASL_PLAINTEXT, SASL_SSL, SSL
KAFKA_SASL_MECHANISMNoe.g. PLAINUse with KAFKA_SASL_USERNAME/KAFKA_SASL_PASSWORD
KAFKA_SASL_USERNAMENostringSASL username
KAFKA_SASL_PASSWORDNostringSASL password
KAFKA_SSL_CAFILENofile pathSSL CA file
KAFKA_SSL_CERTFILENofile pathSSL client cert
KAFKA_SSL_KEYFILENofile pathSSL client key
KAFKA_CLIENT_IDNostringPassed to Kafka clients
KAFKA_TOPIC_PATTERNNoregex.*Consumer subscribes by regex
KAFKA_CONSUMER_GROUP_PREFIXNostringabeGroup id <prefix>_<group> or <prefix>_default
KAFKA_ENABLE_AUTO_COMMITNotrue/falsetrueConsumer auto commit
KAFKA_AUTO_OFFSET_RESETNolatest/earliestlatestStart offset when no committed offset

publish(key: str, payload: Dict[str, Any]) -> None

Publishes a JSON-serialised message. Delivery is confirmed by waiting on future.get(timeout=30) in a background thread. Non-serialisable payloads raise ValueError.

consume(*, group: Optional[str] = None) -> AsyncIterator[Dict[str, Any]]

Subscribes via regex from KAFKA_TOPIC_PATTERN. Consumer group id is "<prefix>_<group>" or "<prefix>_default". Yields decoded JSON payloads; invalid JSON is logged and skipped.

close() -> None

Flushes and closes the underlying Kafka producer and consumer.

Configuration mapping

See detailed mapping in the Kafka backend page: Kafka Backend

Usage example

import asyncio
from abe_plugin.backends.message_queue.service.abe_kafka import KafkaMessageQueueBackend

async def main():
b = KafkaMessageQueueBackend.from_env()
await b.publish("demo-topic", {"hello": "world"})
async for m in b.consume(group="g1"):
print(m)
break
await b.close()

async def main_wrapper():
await main()

asyncio.run(main_wrapper())

Troubleshooting

  • Testcontainers may return PLAINTEXT://host:port; abe-kafka normalises to host:port.
  • If topics are not auto-created on your broker, create them or publish before consuming with auto_offset_reset=earliest.
  • On macOS, ensure advertised listeners resolve from the client (IPv4/IPv6).

For contributors

For repository tests and CI workflows, see:

  • Development workflow: ../../development/workflow.mdx
  • CI/CD details: ../../development/ci-cd/index.mdx