LLM-Orchestrated Multi-Agent System with Context-Aware Conversation Management
A production-ready multi-agent system for handling customer service inquiries in an e-commerce platform. This system implements a multi-agent architecture composed by an orchestrator and three specialized agents, each with its own memory and responsibilities.
- OrchestratorAgent: Uses LLM-based intent detection to route messages to appropriate agents
- OrderCancellationAgent: Handles order cancellation with 24-hour policy enforcement
- OrderTrackingAgent: Provides order status and delivery information
- ProductInformationAgent: Answers product questions and serves as fallback handler
It uses Redis for long-term memory storage and caching, and llm-inference providers. The project modular design is so that allows adding new kind of ML models to the inference service without effort needed. For that purpose create a new class in a new module following the pattern in openai.py.
Pydantic schemas are used for input/output validation and structured tool calls. The system includes comprehensive logging with trace IDs for observability.
This it the high level project structure (modular and testable):
multiagent-ecommerce/
├── src/multi_agent/
│ ├── domain/ # Business logic, memory, schemas and agents
│ ├── infrastructure/ # External services (LLM providers, Mock APIs for track, cancel, product_info)
│ └── application/ # FastAPI web layer with main chat service (POST /chat)
├── tests/ # Test cases for functionality and multi-turn conversations
├── docker-compose.yml # Docker Compose for local dev with Redis
└── Dockerfile # Dockerfile for containerizing the app
And the main system architecture components can be seen below:
graph TD
%% External Layer
User[👤 User] --> API[🌐 FastAPI Application<br/>POST /chat]
%% Application Layer
API --> Orchestrator[🎯 Orchestrator Agent<br/>LLM Intent Detection]
%% Agent Layer
Orchestrator --> CancelAgent[❌ Cancellation Agent<br/>24h Policy Enforcement]
Orchestrator --> TrackAgent[📦 Tracking Agent<br/>Order Status & Delivery]
Orchestrator --> ProductAgent[ℹ️ Product Info Agent<br/>FAQ & Fallback Handler]
%% Infrastructure Layer
CancelAgent --> CancelAPI[🔧 Order Cancellation API]
TrackAgent --> TrackAPI[🔧 Order Tracking API]
ProductAgent --> ProductAPI[🔧 Product Information API]
%% Memory and Caching
Orchestrator --> Memory[💾 Hybrid Memory System]
CancelAgent --> Memory
TrackAgent --> Memory
ProductAgent --> Memory
Memory --> Redis[(🗄️ Redis<br/>Long-term & Cache)]
Memory --> InMem[📋 In-Memory<br/>Short-term Context]
%% External Services
Orchestrator --> OpenAI[🤖 OpenAI API<br/>Intent Detection]
CancelAgent --> OpenAI
TrackAgent --> OpenAI
ProductAgent --> OpenAI
%% Response Flow
CancelAgent --> Response[📤 Agent Response]
TrackAgent --> Response
ProductAgent --> Response
Response --> API
API --> User
%% Styling
classDef userLayer fill:#e1f5fe
classDef appLayer fill:#f3e5f5
classDef agentLayer fill:#e8f5e8
classDef infraLayer fill:#fff3e0
classDef dataLayer fill:#fce4ec
class User userLayer
class API,Orchestrator appLayer
class CancelAgent,TrackAgent,ProductAgent agentLayer
class CancelAPI,TrackAPI,ProductAPI,OpenAI infraLayer
class Memory,Redis,InMem,Response dataLayer
(refer to the diagram image architecture_and_request_flow.png for a view with a better spatial organization)
The system maintains conversational context through a hybrid memory approach implemented through the HybridMemory class, which combines:
- Short-term memory: Recent conversation turns (in-memory, fast access). Stores the most recent turns in the conversation: 3 turns (user + agent). These are always included.
- Long-term memory: Conversation history (Redis, persistent). keeps older interactions in Redis for retrieval and for tracking: for context only the 6 older turns are recalled and only when short-term has fewer than 3 turns (avoid overlapping/duplicates).
- Caching: API response caching to improve performance.
This ensures the agent stays focused on the latest dialogue while still recalling important past details.
Hybrid memory architecture:
graph TD
%% Memory Types
subgraph "HybridMemory"
STM[🧠 Short-term Memory<br/>In-Memory Deque<br/>Last 5 turns per session]
LTM[📚 Long-term Memory<br/>Redis Lists<br/>Full conversation history]
Cache[⚡ API Cache<br/>Redis Hash<br/>TTL-based expiration]
end
%% Data Flow
Session[🔑 Session ID] --> STM
Session --> LTM
Agent[🤖 Agent] --> STM
Agent --> Cache
Orchestrator[🎯 Orchestrator] --> LTM
%% Storage
STM --> Memory[💾 Application Memory]
LTM --> Redis[(🗄️ Redis)]
Cache --> Redis
%% Cache Details
Cache --> ProductCache[Product Info<br/>TTL: 1 hour]
Cache --> TrackCache[Order Tracking<br/>TTL: 5 minutes]
%% Usage Patterns
STM -.-> FastAccess[Fast Context<br/>Recent Turns]
LTM -.-> Persistence[Persistent Storage<br/>Full History]
Cache -.-> Performance[API Optimization<br/>Reduced Latency]
Main features:
- The Orchestrator manages long-term conversation history and routes messages depending on detected intent.
- Each agent receives context and manages its own short-term memory
- Agents store their responses back to short-term memory for future context
- Cache layer optimizes repeated API calls
This allows the system to handle multi-turn conversations effectively, maintaining context across turns.
Here is an example of a real multi-turn conversation with the system:
graph TD
%% Turn 1
subgraph "Turn 1: Product Question"
T1U[User: 'I have an issue with order ORD-9999']
T1O[Orchestrator detects: track_order]
T1A[OrderTrackingAgent responds <br>using context from hybrid memory<br>and calls OrderTrackingAPI]
T1M[User gets tracking information<br>Memory stores: <br>user input + agent response<br/>]
end
%% Turn 2
subgraph "Turn 2: Order Cancellation"
T2U[User: 'Actually, I want to cancel it']
T2O[Orchestrator detects: cancel_order]
T2A[OrderCancellationAgent responds <br>Agent sees ORD-9999 in context<br/>and calls OrderCancellationAPI]
T2R[User gets cancellation confirmation<br>Memory stores: <br>user input + agent response]
end
%% Flow
T1U --> T1O --> T1A --> T1M
T1M --> T2U --> T2O --> T2A --> T2R
%% Memory Context
T1M -.-> Context[Context Available: <br>short-term + long-term</br> without overlap]
Context -.-> T2A
# Clone and setup
git clone <repository>
cd multi-agent
# Set environment variables
echo "OPENAI_API_KEY=your_key_here" > .env
# Start services
docker-compose up --build
# API will be available at http://localhost:8000# Install dependencies
pip install -r requirements.txt
# Start Redis
docker run -d -p 6379:6379 redis:alpine
# Set environment variables
export OPENAI_API_KEY=your_key_here
export REDIS_URL=redis://localhost:6379
# Run application
uvicorn src.multi_agent.application.chat_service.main:app --reload --host 0.0.0.0 --port 8000# Install dependencies
pip install -r requirements.txt
# Start Redis (if not already running)
redis-server
# Set environment variables in .env or export directly
export OPENAI_API_KEY=your_key_here
export REDIS_URL=redis://localhost:6379
export LOG_LEVEL=INFO # set to DEBUG for detailed logs
# Run application from project root
uvicorn src.multi_agent.application.chat_service.main:app --reload --host 0.0.0.0 --port 8000
# API will be available at http://localhost:8000Located at POST /chat endpoint with code at chat_service/main.py, the request flow is as follows:
sequenceDiagram
participant U as User
participant API as FastAPI
participant O as Orchestrator
participant A as Agent
participant M as Memory
participant R as Redis
participant LLM as OpenAI
participant API_EXT as Mock APIs
U->>API: POST /chat {session_id, message}
API->>O: route(session_id, message)
O->>R: Store user input (long-term)
O->>LLM: Detect intent from message
LLM-->>O: Intent {type, confidence}
O->>A: act_on(message, session_id, trace_id)
A->>M: get_short_term(session_id)
A->>M: get_long_term(session_id)
M-->>A: conversation context
A->>LLM: Generate response with context
LLM-->>A: Structured response
A->>M: Check cache for API call
alt Cache Miss
A->>API_EXT: Call external API
API_EXT-->>A: API result
A->>M: Cache API result
else Cache Hit
M-->>A: Cached result
end
A->>M: add_short_term(session_id, context)
A-->>O: AgentResponse
O->>R: Store agent response (long-term)
O-->>API: AgentResponse
API-->>U: JSON response
POST /chat- Send a message through the multi-agent system:
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{
"session_id": "user123",
"message": "I want to cancel my order ORD-1234"
}'Response:
{
"response": "Sure, I can help with that. Let me check if your order is eligible for cancellation.",
"agent": "OrderCancellationAgent",
"tool_calls": [
{
"tool": "OrderCancellationAPI",
"input": {"orderId": "ORD-1234"},
"result": {"status": "cancelled", "refunded": true}
}
],
"handover": "OrchestratorAgent → OrderCancellationAgent",
"trace_id": "uuid-here"
}-
Additional Endpoints
GET /health- Health check endpointDELETE /admin/cache- Clear the response cacheDELETE /admin/memory/{session_id}- Clear memory for a specific sessionDELETE /admin/memory- Clear all session memoriesGET /metrics- metrics endpoint
Swagger UI screenshot:
NOTE: The system uses mock APIs for order tracking, cancellation, and product information located in infrastructure/apis/mock_apis.py. In a real-world scenario, these would be replaced with actual backend service integrations.
The system includes an in-memory MetricsCollector to monitor agent and system performance in real-time. Key metrics tracked:
- Uptime (uptime_seconds) – Total running time of the service.
- Request metrics
- total_requests – Number of requests processed.
- avg_response_time_ms – Average request processing time.
- max_response_time_ms / min_response_time_ms – Longest and shortest request times.
- Agent usage (agent_usage) – Number of requests handled by each agent.
- Intent accuracy (avg_intent_confidence) – Average confidence score for intent detection.
- Cache performance
- cache_hit_rate – Percentage of requests served from cache.
- cache_stats – Raw counts of cache hits and misses.
- Errors (error_count) – Number of errors encountered during processing.
Metrics are thread-safe, stored in memory with a configurable history size (default: 1000 entries).
# Run basic functionality tests fo the system
python tests/test_basic_functionality.py
# Test multi-turn conversations (requires OpenAI API key)
python tests/test_multi_turn.py
# Manual API testing
curl http://localhost:8000/health- Test the api "manually" via swagger UI at
http://localhost:8000/docs, or using Postman or curl. - Test the system functionality and multi-turn conversations with the provided test scripts in tests/.
Environment variables:
OPENAI_API_KEY- Required for LLM functionalityREDIS_URL- Redis connection (default: redis://localhost:6379)LOG_LEVEL- Logging level (default: INFO)
The system includes comprehensive logging that allows correlation of logs across services using trace IDs. It includes:
- Trace IDs: Every request gets a unique trace ID for request tracking
- Agent-level logging: Each agent logs its decisions, input and context, tools calls and outputs.
- Cache performance: Cache hits/misses are logged
- Intent detection: LLM routing decisions are logged
- Detailed error messages with stack traces.
- Multi-Agent Architecture: The system is composed by an orchestrator + specialized agents, that allow for focused handling of specific tasks, improving maintainability and scalability. The system is built from scratch without using existing agent frameworks to have full control over the architecture and implementation.
- Hybrid Memory System: Combines short-term in-memory context with long-term Redis storage for efficient conversation management.
- LLM Integration: OpenAI's GPT models are used for intent detection and response generation, with modular design to support other providers.
- Caching Layer: Redis caching reduces latency for repeated API calls, improving user experience.
- Logging and Observability: Detailed logging with trace IDs enables effective monitoring and debugging.
- Configuration Management: Environment variables allow easy configuration for different environments (development, staging, production).
- API Design: RESTful API design with clear endpoints for chat interactions and administrative tasks.
- Mock APIs: Simulated backend services for order tracking, cancellation, and product information to facilitate development and testing.
Properties:
- In-process mock APIs: Python functions that simulate the business logic locally
- No external HTTP calls: APIs are imported as modules and called directly
- Self-contained system:Everything runs within the application, no external dependencies
Mock Orders:
MOCK_ORDERS = {
"ORD-1234": {"created": datetime.now() - timedelta(hours=2), "status": "processing"},
"ORD-5678": {"created": datetime.now() - timedelta(hours=30), "status": "shipped"}, # Too old to cancel
"ORD-9999": {"created": datetime.now() - timedelta(minutes=30), "status": "processing"},
"ORD-0001": {"created": datetime.now() - timedelta(hours=48), "status": "delivered"},
}Why not using external mock servers such as Beeceptor?
With the current approach, the system has:
- Faster execution --> No network latency
- More reliable --> No external service dependencies
- Better for testing --> Deterministic responses, no internet required
- Easier deployment --> Self-contained Docker containers
- Custom Implementation to simulate real business logic --> 24-hour cancellation policy, order validation...
Note: The
ProductInformationAPI(refer to mock_apis.py) uses a simple keyword-based approach to simulate product queries.
In a real system, this would be replaced with a proper product information and FAQ vector dbs for RAG.
These are the components that describe the approach:
- Knowledge Base: a dictionary with common e-commerce FAQs and product details.
knowledge_base = {
"return policy": "You can return items within 30 days...",
"shipping": "We offer free shipping on orders over $50...",
"warranty": "All electronics come with 1-year warranty...",
"bluetooth headphones": "Our Bluetooth headphones feature 20-hour battery..."
}-
Search Logic: The function looks for keywords in the user's question.
-
Response Selection: Returns the best matching answer:
- If keyword found: return the corresponding answer with "high" confidence
- If no match: return generic "contact customer service" with "low" confidence
-
Scoring: Simple scoring system:
- Longer keyword matches get higher priority
- "bluetooth headphones" beats "headphones" if both match
So, it creates a basic FAQ system that can answer e-commerce FAQ (policies) and product information. It simulates what a real product information system would do but uses simple string matching instead of advanced search algorithms (based on embeddings and vector stores).
- Modularity :Clear separation of concerns across domain, infrastructure, and application layers facilitates testing and future enhancements. All specialized agents inherit from a common base class
BaseAgentwith_build_conversation_context method(). - Performance: Caching and efficient memory management ensure low latency responses.
- Robustness: Error handling and retries for external API calls ensure reliability.
- Containerization: Dockerfile and Docker Compose setup for easy deployment and local development.
- Scalability: The architecture supports horizontal scaling of agents and memory components to handle increased load.
- Extensibility: The system is designed to easily add new agents and functionalities as needed.
- Use vector databases for semantic search in long-term memory and product information retrieval.
- Integrate with real e-commerce backend systems that replace the mock_apis.
- Add a web-based chat UI.
- Add a final answer verification step (self-reflexion and correction).
- Extend input/output validation and introduce security checks.
- Add more specialized agents (in a real system you would have
ReturnsAgent,PaymentAgent...). - Enhance intent detection with custom models and all prompts.
This project is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International (CC BY-NC-SA 4.0) License. See the LICENSE file for details.
