Skip to main content
Version: 0.0.1

Loader & Discovery

load_backend() in abe/backends/queue/loader.py is responsible for resolving the concrete backend implementation that your application will use at runtime.

Entry point groupโ€‹

Backends register themselves under the abe.backends.message_queue entry-point group. When you install a provider package (for example abe-backend-redis), its pyproject.toml must expose a class with a from_env() classmethod:

[project.entry-points."abe.backends.message_queue"]
redis = "abe_backend_redis.backend:RedisBackend"

The loader inspects this group via importlib.metadata.entry_points().

Resolution orderโ€‹

  1. Environment override โ€“ If QUEUE_BACKEND is set, the loader looks for a matching entry-point name and instantiates it.
  2. Auto-select โ€“ Otherwise, the loader returns the first registered backend whose entry-point name is not memory.
  3. Fallback โ€“ If no providers are installed, it returns the in-repo MemoryBackend and emits a warning.
abe/backends/message_queue/loader.py
from importlib.metadata import entry_points

BACKEND_ENTRY_POINT_GROUP = "abe.backends.message_queue"

requested_backend = os.getenv("QUEUE_BACKEND")
backends = entry_points(group=BACKEND_ENTRY_POINT_GROUP)

Error handlingโ€‹

  • Raises RuntimeError when QUEUE_BACKEND references a name that is not registered.
  • Emits warnings (warnings.warn) when no providers are available or when the fallback memory backend is used.
  • Uses typing.cast to narrow the type of from_env() returns, ensuring static type checkers see a MessageQueueBackend.

Configuration contractโ€‹

Providers must implement a from_env() classmethod that reads configuration from environment variables or configuration files. Recommended pattern:

from abe.types import (
MessageQueueBackendProtocol,
MessageQueueKey,
MessageQueuePayload,
MessageQueueMessage,
ConsumerGroup,
)


class MyBackend(MessageQueueBackendProtocol):
@classmethod
def from_env(cls) -> "MyBackend":
settings = MyConfig.load()
return cls(settings)

async def publish(self, key: MessageQueueKey, payload: MessageQueuePayload) -> None:
...

async def consume(
self,
*,
group: ConsumerGroup = None,
) -> AsyncIterator[MessageQueueMessage]:
...

If required settings are missing, raise a descriptive exception so deployment tooling can surface the misconfiguration quickly.

Testing tipsโ€‹

  • Patch importlib.metadata.entry_points to inject fake entry points during unit tests.
  • Use environment variable overrides (monkeypatch.setenv) to drive specific code paths.
  • Validate the fallback behaviour by clearing providers and inspecting the warning emitted when MemoryBackend is returned.