Memory Backend
abe/backends/queue/service/memory.py ships with the library as a development-only backend. It implements the QueueBackend protocol using a process-local asyncio.Queue.
When to use itβ
- Local development and unit tests where installing a real broker is unnecessary.
- CI pipelines that want a lightweight fallback when no providers are installed.
- Demonstrations of how to implement
QueueBackendwithout external dependencies.
Limitationsβ
- Messages live in memory only; they disappear when the process exits.
- There is no cross-process visibilityβpublishers and consumers must run in the same interpreter.
- Consumer groups are ignored.
Implementation outlineβ
abe/backends/queue/service/memory.py
class MemoryBackend(QueueBackend):
_queue: asyncio.Queue[tuple[str, dict[str, Any]]] = asyncio.Queue()
@classmethod
def from_env(cls) -> "MemoryBackend":
warnings.warn("Memory backend is for development/testing only", UserWarning)
return cls()
async def publish(self, key: QueueKey, payload: QueuePayload) -> None:
await self._queue.put((key, payload))
async def consume(self, *, group: ConsumerGroup = None) -> AsyncIterator[QueueMessage]:
while True:
_, payload = await self._queue.get()
try:
self._queue.task_done()
except ValueError:
logger.warning("task_done() called too many times")
yield payload
Behavioural notesβ
task_done()calls are wrapped in exception handling to avoid double-count errors.- Cancellation is respectedβif
consume()is cancelled mid-await, the backend logs the situation and ensures the queue accounting remains consistent. - Any unexpected exception is logged and re-raised so callers can act accordingly.
Recommended usage patternsβ
- Pair with
AsyncLoopConsumerduring internal testing. - Avoid promoting it to production: set
QUEUE_BACKENDexplicitly in staging/production to ensure a real provider is used. - When writing tests, consider clearing the queue between test cases to guarantee isolation (e.g., drain the queue in teardown).