Skip to main content
Version: 0.0.1

Memory Backend

abe/backends/queue/service/memory.py ships with the library as a development-only backend. It implements the QueueBackend protocol using a process-local asyncio.Queue.

When to use it​

  • Local development and unit tests where installing a real broker is unnecessary.
  • CI pipelines that want a lightweight fallback when no providers are installed.
  • Demonstrations of how to implement QueueBackend without external dependencies.

Limitations​

  • Messages live in memory only; they disappear when the process exits.
  • There is no cross-process visibilityβ€”publishers and consumers must run in the same interpreter.
  • Consumer groups are ignored.

Implementation outline​

abe/backends/queue/service/memory.py
class MemoryBackend(QueueBackend):
_queue: asyncio.Queue[tuple[str, dict[str, Any]]] = asyncio.Queue()

@classmethod
def from_env(cls) -> "MemoryBackend":
warnings.warn("Memory backend is for development/testing only", UserWarning)
return cls()

async def publish(self, key: QueueKey, payload: QueuePayload) -> None:
await self._queue.put((key, payload))

async def consume(self, *, group: ConsumerGroup = None) -> AsyncIterator[QueueMessage]:
while True:
_, payload = await self._queue.get()
try:
self._queue.task_done()
except ValueError:
logger.warning("task_done() called too many times")
yield payload

Behavioural notes​

  • task_done() calls are wrapped in exception handling to avoid double-count errors.
  • Cancellation is respectedβ€”if consume() is cancelled mid-await, the backend logs the situation and ensures the queue accounting remains consistent.
  • Any unexpected exception is logged and re-raised so callers can act accordingly.
  • Pair with AsyncLoopConsumer during internal testing.
  • Avoid promoting it to production: set QUEUE_BACKEND explicitly in staging/production to ensure a real provider is used.
  • When writing tests, consider clearing the queue between test cases to guarantee isolation (e.g., drain the queue in teardown).