Loader & Discovery
load_backend() in abe/backends/queue/loader.py is responsible for resolving the concrete backend implementation that your application will use at runtime.
Entry point groupโ
Backends register themselves under the abe.backends.message_queue entry-point group. When you install a provider package (for example abe-backend-redis), its pyproject.toml must expose a class with a from_env() classmethod:
[project.entry-points."abe.backends.message_queue"]
redis = "abe_backend_redis.backend:RedisBackend"
The loader inspects this group via importlib.metadata.entry_points().
Resolution orderโ
- Environment override โ If
QUEUE_BACKENDis set, the loader looks for a matching entry-point name and instantiates it. - Auto-select โ Otherwise, the loader returns the first registered backend whose entry-point name is not
memory. - Fallback โ If no providers are installed, it returns the in-repo
MemoryBackendand emits a warning.
abe/backends/message_queue/loader.py
from importlib.metadata import entry_points
BACKEND_ENTRY_POINT_GROUP = "abe.backends.message_queue"
requested_backend = os.getenv("QUEUE_BACKEND")
backends = entry_points(group=BACKEND_ENTRY_POINT_GROUP)
Error handlingโ
- Raises
RuntimeErrorwhenQUEUE_BACKENDreferences a name that is not registered. - Emits warnings (
warnings.warn) when no providers are available or when the fallback memory backend is used. - Uses
typing.castto narrow the type offrom_env()returns, ensuring static type checkers see aMessageQueueBackend.
Configuration contractโ
Providers must implement a from_env() classmethod that reads configuration from environment variables or configuration files. Recommended pattern:
from abe.types import (
MessageQueueBackendProtocol,
MessageQueueKey,
MessageQueuePayload,
MessageQueueMessage,
ConsumerGroup,
)
class MyBackend(MessageQueueBackendProtocol):
@classmethod
def from_env(cls) -> "MyBackend":
settings = MyConfig.load()
return cls(settings)
async def publish(self, key: MessageQueueKey, payload: MessageQueuePayload) -> None:
...
async def consume(
self,
*,
group: ConsumerGroup = None,
) -> AsyncIterator[MessageQueueMessage]:
...
If required settings are missing, raise a descriptive exception so deployment tooling can surface the misconfiguration quickly.
Testing tipsโ
- Patch
importlib.metadata.entry_pointsto inject fake entry points during unit tests. - Use environment variable overrides (
monkeypatch.setenv) to drive specific code paths. - Validate the fallback behaviour by clearing providers and inspecting the warning emitted when
MemoryBackendis returned.