Design Patterns
This document outlines the key design patterns used throughout the Engineering AI Agent codebase. Following these patterns ensures consistency, maintainability, and scalability of the system.
Architectural Patterns
Microservices Architecture
The Engineering AI Agent system follows a microservices architecture to enable independent development, deployment, and scaling of components.
Implementation Guidelines:
- Services should be organized around business capabilities
- Each service owns its data storage
- Communication between services should be through well-defined APIs
- Services should be independently deployable
Example Implementation:
# Each service has its own entry point
# api_gateway/main.py
from fastapi import FastAPI, Depends
from .auth import get_current_user
from .routers import rd, pm, qa, sa, sd, sre
app = FastAPI(title="Engineering AI Agent API Gateway")
app.include_router(rd.router, prefix="/api/v1/rd", tags=["RD"])
app.include_router(pm.router, prefix="/api/v1/pm", tags=["PM"])
app.include_router(qa.router, prefix="/api/v1/qa", tags=["QA"])
app.include_router(sa.router, prefix="/api/v1/sa", tags=["SA"])
app.include_router(sd.router, prefix="/api/v1/sd", tags=["SD"])
app.include_router(sre.router, prefix="/api/v1/sre", tags=["SRE"])
Event-Driven Architecture
The system uses an event-driven architecture for asynchronous communication between components.
Implementation Guidelines:
- Events should be immutable records of something that happened
- Events should be named in past tense (e.g.,
TaskCreated
,CodeGenerated
) - Consumers should be idempotent to handle duplicate events
- Events should include timestamps and correlation IDs
Example Implementation:
# events/base.py
from pydantic import BaseModel, Field
from datetime import datetime
import uuid
from typing import Optional, Dict, Any
class Event(BaseModel):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: str
timestamp: datetime = Field(default_factory=datetime.now)
correlation_id: Optional[str] = None
data: Dict[str, Any]
# Example event producer
async def publish_event(event: Event):
# Publish to message broker (e.g., RabbitMQ, Kafka, etc.)
await message_broker.publish(
exchange="events",
routing_key=event.event_type,
body=event.json().encode()
)
# Example event consumer
async def handle_task_created(event_data: dict):
event = Event(**event_data)
# Process the event
# Ensure idempotency by checking if we've seen this event_id before
Creational Patterns
Factory Method Pattern
Used to create objects without specifying the exact class of object that will be created.
Use Cases:
- Creating role-specific handlers based on message content
- Instantiating the appropriate LLM client based on configuration
- Creating integration connectors based on service type
Example Implementation:
# llm/factory.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class LLMProvider(ABC):
@abstractmethod
async def generate(self, prompt: str, **kwargs) -> str:
pass
class OpenAIProvider(LLMProvider):
async def generate(self, prompt: str, **kwargs) -> str:
# OpenAI-specific implementation
pass
class AnthropicProvider(LLMProvider):
async def generate(self, prompt: str, **kwargs) -> str:
# Anthropic-specific implementation
pass
class LLMProviderFactory:
@staticmethod
def create_provider(provider_type: str, config: Dict[str, Any]) -> LLMProvider:
if provider_type == "openai":
return OpenAIProvider(**config)
elif provider_type == "anthropic":
return AnthropicProvider(**config)
else:
raise ValueError(f"Unsupported provider type: {provider_type}")
Dependency Injection
Used to achieve Inversion of Control by injecting dependencies rather than creating them internally.
Use Cases:
- Providing database connections to repositories
- Supplying configuration to services
- Injecting mock dependencies for testing
Example Implementation:
# Using FastAPI's dependency injection system
from fastapi import Depends
from sqlalchemy.orm import Session
from .database import get_db
from .repositories import TaskRepository
class TaskService:
def __init__(self, task_repository: TaskRepository):
self.task_repository = task_repository
# In FastAPI route
async def get_tasks(
db: Session = Depends(get_db),
task_service: TaskService = Depends()
):
# Use task_service to get tasks
pass
# For testing
def test_task_service():
mock_repo = MockTaskRepository()
service = TaskService(mock_repo)
# Test service with mock repository
Structural Patterns
Adapter Pattern
Used to allow incompatible interfaces to work together.
Use Cases:
- Wrapping external APIs (GitHub, Slack, etc.) with a consistent interface
- Converting between different data formats
- Integrating with different LLM providers
Example Implementation:
# integrations/github/adapter.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class VersionControlAdapter(ABC):
@abstractmethod
async def create_pull_request(self, title: str, branch: str, base: str, body: str) -> Dict[str, Any]:
pass
@abstractmethod
async def get_commits(self, branch: str, since: str) -> List[Dict[str, Any]]:
pass
class GitHubAdapter(VersionControlAdapter):
def __init__(self, client):
self.client = client
async def create_pull_request(self, title: str, branch: str, base: str, body: str) -> Dict[str, Any]:
# GitHub-specific implementation
response = await self.client.post(
"/pulls",
json={
"title": title,
"head": branch,
"base": base,
"body": body
}
)
return response.json()
async def get_commits(self, branch: str, since: str) -> List[Dict[str, Any]]:
# GitHub-specific implementation
response = await self.client.get(
f"/commits?sha={branch}&since={since}"
)
return response.json()
Facade Pattern
Used to provide a simplified interface to a complex subsystem.
Use Cases:
- Providing a unified API for multiple microservices
- Simplifying interaction with complex external systems
- Creating a higher-level interface for common operations
Example Implementation:
# llm/facade.py
class LLMFacade:
def __init__(
self,
provider_factory,
prompt_manager,
context_manager
):
self.provider_factory = provider_factory
self.prompt_manager = prompt_manager
self.context_manager = context_manager
async def generate_code(self, requirements: str, language: str) -> str:
# Complex orchestration simplified into a single method
context = await self.context_manager.get_context(requirements)
prompt = self.prompt_manager.create_code_prompt(
requirements=requirements,
language=language,
context=context
)
provider = self.provider_factory.create_provider("openai", {})
return await provider.generate(prompt)
async def analyze_requirements(self, text: str) -> dict:
# Another simplified high-level operation
pass
Behavioral Patterns
Strategy Pattern
Defines a family of algorithms, encapsulates each one, and makes them interchangeable.
Use Cases:
- Different code generation strategies for various languages
- Multiple approaches for task prioritization
- Alternative methods for context retrieval
Example Implementation:
# code_generation/strategies.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class CodeGenerationStrategy(ABC):
@abstractmethod
async def generate(self, requirements: str, context: Dict[str, Any]) -> str:
pass
class PythonGenerationStrategy(CodeGenerationStrategy):
async def generate(self, requirements: str, context: Dict[str, Any]) -> str:
# Python-specific implementation
pass
class JavaScriptGenerationStrategy(CodeGenerationStrategy):
async def generate(self, requirements: str, context: Dict[str, Any]) -> str:
# JavaScript-specific implementation
pass
class CodeGenerator:
def __init__(self, strategy: CodeGenerationStrategy):
self.strategy = strategy
def set_strategy(self, strategy: CodeGenerationStrategy):
self.strategy = strategy
async def generate_code(self, requirements: str, context: Dict[str, Any]) -> str:
return await self.strategy.generate(requirements, context)
Observer Pattern
Defines a one-to-many dependency between objects so that when one object changes state, all its dependents are notified.
Use Cases:
- Notifying multiple services when a task status changes
- Updating UIs when backend state changes
- Triggering workflows when certain events occur
Example Implementation:
# events/observer.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class EventListener(ABC):
@abstractmethod
async def on_event(self, event_type: str, data: Dict[str, Any]):
pass
class EventPublisher:
def __init__(self):
self._listeners: Dict[str, List[EventListener]] = {}
def subscribe(self, event_type: str, listener: EventListener):
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(listener)
def unsubscribe(self, event_type: str, listener: EventListener):
if event_type in self._listeners:
self._listeners[event_type].remove(listener)
async def publish(self, event_type: str, data: Dict[str, Any]):
if event_type in self._listeners:
for listener in self._listeners[event_type]:
await listener.on_event(event_type, data)
# Example usage
class TaskStatusListener(EventListener):
async def on_event(self, event_type: str, data: Dict[str, Any]):
if event_type == "task.status_changed":
# Handle task status change
pass
# In application startup
event_publisher = EventPublisher()
task_listener = TaskStatusListener()
event_publisher.subscribe("task.status_changed", task_listener)
# When task status changes
await event_publisher.publish("task.status_changed", {
"task_id": "123",
"old_status": "in_progress",
"new_status": "completed"
})
Command Pattern
Encapsulates a request as an object, allowing for parameterization of clients with different requests and queue or log requests.
Use Cases:
- Implementing undo/redo functionality
- Queuing and scheduling tasks
- Implementing workflow steps
Example Implementation:
# workflows/commands.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List
class Command(ABC):
@abstractmethod
async def execute(self) -> Dict[str, Any]:
pass
@abstractmethod
async def undo(self) -> Dict[str, Any]:
pass
class CreateTaskCommand(Command):
def __init__(self, task_service, task_data: Dict[str, Any]):
self.task_service = task_service
self.task_data = task_data
self.created_task_id = None
async def execute(self) -> Dict[str, Any]:
task = await self.task_service.create_task(self.task_data)
self.created_task_id = task["id"]
return task
async def undo(self) -> Dict[str, Any]:
if self.created_task_id:
await self.task_service.delete_task(self.created_task_id)
return {"status": "undone"}
class CommandInvoker:
def __init__(self):
self.history: List[Command] = []
async def execute_command(self, command: Command) -> Dict[str, Any]:
result = await command.execute()
self.history.append(command)
return result
async def undo_last_command(self) -> Dict[str, Any]:
if self.history:
command = self.history.pop()
return await command.undo()
return {"status": "nothing to undo"}
Concurrency Patterns
Asynchronous Service Pattern
Leverages asynchronous programming for non-blocking I/O operations.
Use Cases:
- API calls to external services
- Database operations
- File I/O operations
Example Implementation:
# services/async_service.py
import asyncio
from typing import List, Dict, Any
class AsyncTaskService:
def __init__(self, repository, event_publisher):
self.repository = repository
self.event_publisher = event_publisher
async def create_tasks_in_parallel(self, task_data_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Create multiple tasks concurrently
create_tasks = [
self.create_task(task_data)
for task_data in task_data_list
]
return await asyncio.gather(*create_tasks)
async def create_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
# Asynchronous task creation
task = await self.repository.create(task_data)
# Publish event asynchronously
await self.event_publisher.publish("task.created", task)
return task
Circuit Breaker Pattern
Prevents cascading failures by detecting failures and encapsulating the logic of preventing a failure from constantly recurring.
Use Cases:
- Protecting against external API failures
- Managing database connection issues
- Handling rate limiting gracefully
Example Implementation:
# resilience/circuit_breaker.py
import time
from enum import Enum
from typing import Callable, Any, TypeVar, Generic
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "CLOSED" # Normal operation
OPEN = "OPEN" # Not allowing calls
HALF_OPEN = "HALF_OPEN" # Testing if service is back
class CircuitBreaker(Generic[T]):
def __init__(
self,
failure_threshold: int = 5,
recovery_time: float = 30.0,
fallback: Callable[..., T] = None
):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.fallback = fallback
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
async def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
if self.state == CircuitState.OPEN:
# Check if recovery time has elapsed
if time.time() - self.last_failure_time > self.recovery_time:
self.state = CircuitState.HALF_OPEN
else:
return await self._handle_open_circuit(*args, **kwargs)
try:
result = await func(*args, **kwargs)
# If the call succeeded and we were in HALF_OPEN, reset to CLOSED
if self.state == CircuitState.HALF_OPEN:
self.reset()
return result
except Exception as e:
return await self._handle_failure(e, *args, **kwargs)
async def _handle_failure(self, exception, *args, **kwargs) -> T:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN or self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
return await self._handle_open_circuit(exception, *args, **kwargs)
async def _handle_open_circuit(self, exception=None, *args, **kwargs) -> T:
if self.fallback:
return await self.fallback(*args, **kwargs)
raise exception or RuntimeError("Circuit is open")
def reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
# Example usage
async def fallback_get_user(user_id: str) -> dict:
# Return cached or default user when the service is down
return {"id": user_id, "name": "Unknown", "is_fallback": True}
user_service_cb = CircuitBreaker(
failure_threshold=3,
recovery_time=60.0,
fallback=fallback_get_user
)
async def get_user(user_id: str) -> dict:
# This function will be protected by the circuit breaker
async def _get_user(user_id: str) -> dict:
# Actual API call
response = await api_client.get(f"/users/{user_id}")
return response.json()
return await user_service_cb.execute(_get_user, user_id)
Data Access Patterns
Repository Pattern
Mediates between the domain and data mapping layers, acting like an in-memory domain object collection.
Use Cases:
- Abstracting data access logic from business logic
- Providing a consistent interface to data sources
- Simplifying testing with mock repositories
Example Implementation:
# repositories/base.py
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, List, Optional, Dict, Any
from pydantic import BaseModel
T = TypeVar('T', bound=BaseModel)
class Repository(Generic[T], ABC):
@abstractmethod
async def find_by_id(self, id: str) -> Optional[T]:
pass
@abstractmethod
async def find_all(self, filter_params: Dict[str, Any] = None) -> List[T]:
pass
@abstractmethod
async def create(self, data: Dict[str, Any]) -> T:
pass
@abstractmethod
async def update(self, id: str, data: Dict[str, Any]) -> Optional[T]:
pass
@abstractmethod
async def delete(self, id: str) -> bool:
pass
# repositories/task_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from typing import List, Optional, Dict, Any
from .base import Repository
from ..models.task import Task, TaskInDB
class TaskRepository(Repository[TaskInDB]):
def __init__(self, session: AsyncSession):
self.session = session
async def find_by_id(self, id: str) -> Optional[TaskInDB]:
result = await self.session.execute(
select(Task).where(Task.id == id)
)
task = result.scalar_one_or_none()
return TaskInDB.from_orm(task) if task else None
async def find_all(self, filter_params: Dict[str, Any] = None) -> List[TaskInDB]:
query = select(Task)
if filter_params:
for key, value in filter_params.items():
if hasattr(Task, key):
query = query.where(getattr(Task, key) == value)
result = await self.session.execute(query)
tasks = result.scalars().all()
return [TaskInDB.from_orm(task) for task in tasks]
async def create(self, data: Dict[str, Any]) -> TaskInDB:
task = Task(**data)
self.session.add(task)
await self.session.commit()
await self.session.refresh(task)
return TaskInDB.from_orm(task)
async def update(self, id: str, data: Dict[str, Any]) -> Optional[TaskInDB]:
await self.session.execute(
update(Task).where(Task.id == id).values(**data)
)
await self.session.commit()
return await self.find_by_id(id)
async def delete(self, id: str) -> bool:
result = await self.session.execute(
delete(Task).where(Task.id == id)
)
await self.session.commit()
return result.rowcount > 0
Unit of Work Pattern
Maintains a list of objects affected by a business transaction and coordinates the writing out of changes.
Use Cases:
- Ensuring database transaction consistency
- Grouping related operations into atomic units
- Managing complex database operations
Example Implementation:
# repositories/unit_of_work.py
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Dict, Any, Optional, Type
from contextlib import asynccontextmanager
from .task_repository import TaskRepository
from .user_repository import UserRepository
class UnitOfWork:
def __init__(self, session: AsyncSession):
self.session = session
self.task_repository = TaskRepository(session)
self.user_repository = UserRepository(session)
async def commit(self):
await self.session.commit()
async def rollback(self):
await self.session.rollback()
@asynccontextmanager
async def get_unit_of_work(session_factory) -> UnitOfWork:
async with session_factory() as session:
uow = UnitOfWork(session)
try:
yield uow
await uow.commit()
except Exception:
await uow.rollback()
raise
# Example usage in a service
async def assign_task_to_user(
task_id: str,
user_id: str,
session_factory
):
async with get_unit_of_work(session_factory) as uow:
# These operations will be in a single transaction
task = await uow.task_repository.find_by_id(task_id)
user = await uow.user_repository.find_by_id(user_id)
if not task or not user:
raise ValueError("Task or user not found")
await uow.task_repository.update(
task_id,
{"assigned_to": user_id, "status": "assigned"}
)
await uow.user_repository.update(
user_id,
{"assigned_task_count": user.assigned_task_count + 1}
)
# Both operations succeed or fail together
Best Practices
Implementing New Patterns
When implementing a new pattern in the codebase:
- Document the pattern: Add clear documentation explaining the pattern and its use cases
- Create examples: Provide example implementations in the codebase
- Establish naming conventions: Follow consistent naming conventions for pattern components
- Review with team: Discuss pattern implementations with the team to ensure alignment
Choosing the Right Pattern
Consider these factors when selecting a pattern:
- Problem domain fit: Choose patterns that naturally solve the problem at hand
- Simplicity: Prefer simpler patterns over complex ones when they meet requirements
- Consistency: Align with existing patterns in the codebase
- Performance implications: Consider the performance characteristics of the pattern
- Maintainability: Evaluate how the pattern affects code maintainability
Anti-Patterns to Avoid
- God Objects: Avoid creating classes that know or do too much
- Spaghetti Code: Maintain clear separation of concerns
- Premature Optimization: Don't add complexity for performance without evidence
- Reinventing the Wheel: Use established libraries and frameworks when appropriate
- Overengineering: Don't apply patterns where simple code would suffice