Production Deployment
Production deployment involves multiple aspects such as reliability, scalability, and cost control.
Engineering practices ensure stable system operation and easy maintenance.
* * *
## Production Deployment Architecture
Production environments have higher requirements for systems.
Multiple aspects need to be considered, including reliability, scalability, monitoring, and alerting.
### Deployment Modes
#### Single Machine Deployment
Suitable for development and testing environments.
Simple and easy to deploy, but cannot handle production-level traffic.
#### Distributed Deployment
Suitable for production environments, requiring consideration of multiple aspects.
Load balancing: Distribute requests to multiple instances.
Service discovery: Dynamically manage instance lists.
State management: Handle distributed state consistency issues.
Fault tolerance: Single point of failure does not affect the overall service.
### Containerized Deployment
## Dockerfile Example
# Based on Python 3.11 image
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user (security consideration)
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8000
# Startup command
CMD ["python", "agent.py"]
### Kubernetes Deployment
## Kubernetes Deployment Configuration
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent
labels:
app: ai-agent
spec:
# Replica count
replicas: 3
selector:
matchLabels:
app: ai-agent
template:
metadata:
labels:
app: ai-agent
spec:
containers:
- name: agent
image: your-registry/ai-agent:latest
ports:
- containerPort: 8000
# Resource limits
resources:
limits:
memory: "2Gi"
cpu: "1"
requests:
memory: "1Gi"
cpu: "0.5"
# Health checks
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: ai-agent-service
spec:
selector:
app: ai-agent
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
* * *
## Cost Control
The cost of Agent systems mainly comes from LLM API calls.
Optimizing costs is an important topic for production deployment.
### Token Optimization Strategies
Prompt compression: Streamline prompts to reduce unnecessary text.
Context truncation: Only keep key context content.
Cache responses: Cache responses for identical or similar queries.
### Code Implementation
## Cost Optimized Agent Implementation
import hashlib
import json
from functools import lru_cache
class CostOptimizedAgent:
"""
Cost-optimized Agent
Reduces API call costs through caching and compression
"""
def __init__ (self, base_agent, cache, embedder=None):
# Base Agent
self.base_agent= base_agent
# Cache storage
self.cache= cache
# Embedder (for similarity-based cache matching)
self.embedder= embedder
def process(self, request):
"""
Process requests with cost optimization logic
"""
# Generate request fingerprint
request_fingerprint =self.generate_fingerprint(request)
# Check exact cache
cached_result =self.cache.get(request_fingerprint)
if cached_result:
return{
**cached_result,
"from_cache": True
}
# If embedder exists, check semantic similarity cache
if self.embedder:
similar =self.find_similar_cached(request)
if similar:
return{
**similar,
"from_cache": True,
"similar_to": similar.get("request_id")
}
# Execute request
result =self.base_agent.process(request)
# Cache result
self.cache.set(
request_fingerprint,
result,
ttl=3600# 1 hour expiration
)
return{
**result,
"from_cache": False
}
def generate_fingerprint(self, request):
"""
Generate request fingerprint
Used for exact cache matching
"""
content = json.dumps(request, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def find_similar_cached(self, request, threshold=0.95):
"""
Find semantically similar cached results
Uses cosine similarity of embedding vectors
"""
if not self.embedder:
return None
# Vectorize the request
request_vector =self.embedder.embed([str(request)])
# Search for similar requests in cache
best_match =None
best_similarity =0
for cached in self.cache.get_all():
cached_vector = cached
similarity = cosine_similarity(request_vector, cached_vector)
if similarity > best_similarity and similarity >= threshold:
best_similarity = similarity
best_match = cached
return best_match
class ResponseCache:
"""
Response cache
Simple in-memory cache implementation
Use Redis in production environments
"""
def __init__ (self):
self.cache={}
self.timestamps={}
def get(self, key):
"""Get cache"""
if key in self.cache:
# Check if expired
if self.is_expired(key):
self.delete(key)
return None
return self.cache
return None
def set(self, key, value, ttl=3600):
"""Set cache"""
self.cache= value
self.timestamps=time.time() + ttl
def delete(self, key):
"""Delete cache"""
if key in self.cache:
del self.cache
if key in self.timestamps:
del self.timestamps
def is_expired(self, key):
"""Check if expired"""
if key not in self.timestamps:
return True
return time.time()>self.timestamps
def get_all(self):
"""Get all cache items"""
return list(self.cache.values())
def cosine_similarity(vec1, vec2):
"""Calculate cosine similarity"""
dot_product =sum(a * b for a, b in zip(vec1, vec2))
norm1 =math.sqrt(sum(a * a for a in vec1))
norm2 =math.sqrt(sum(b * b for b in vec2))
return dot_product / (norm1 * norm2)
* * *
## Streaming and Asynchronous
### Streaming Response
Streaming response reduces waiting time and improves user experience.
No need to wait for complete response, can progressively display generated content.
### Code Implementation
## Streaming Response Agent
import asyncio
class StreamingAgent:
"""
Agent with streaming support
Generate and return progressively to reduce waiting time
"""
def __init__ (self, llm):
self.llm= llm
async def stream_generate(self, prompt):
"""
Stream generation response
Uses async generator pattern
:yield: Generated text chunks
"""
# Start async generation task
async for chunk in self.llm.stream_generate(prompt):
yield chunk
async def process_stream(self, request):
"""
Process streaming request
Returns async generator
"""
prompt =self.build_prompt(request)
# Return generator
async def generate():
async for chunk in self.stream_generate(prompt):
yield chunk
return generate()
# Usage example
async def main():
agent = StreamingAgent(llm)
# Start streaming generation
async for token in agent.stream_generate("Explain quantum computing"):
# Print as generated
print(token, end="", flush=True)
print()# Newline
# Run
asyncio.run(main())
### Asynchronous Task Queue
For long-running tasks, use asynchronous queue processing.
User receives immediate return after submitting task, background async execution.
## Asynchronous Task Queue Implementation
import asyncio
from queue import Queue
from threading import Thread
import uuid
class AsyncAgent:
"""
Async Agent
Uses task queue for long-running tasks
"""
def __init__ (self, agent, task_queue):
# Base Agent
self.agent= agent
# Task queue
self.task_queue= task_queue
# Result storage
self.results={}
# Start background worker thread
self.worker_thread= Thread(target=self.process_queue, daemon=True)
self.worker_thread.start()
async def submit(self, task):
"""
Submit task
Returns task ID immediately
"""
# Generate unique task ID
task_id =str(uuid.uuid4())
# Add to queue
await self.task_queue.enqueue({
"id": task_id,
"task": task,
"status": "pending",
"created_at": time.time()
})
return task_id
async def get_result(self, task_id):
"""
Get task result
Non-blocking
"""
if task_id in self.results:
return self.results
# Check task status
status = await self.task_queue.get_status(task_id)
if status is None:
return None# Task does not exist
if status =="pending"or status =="processing":
return{
"status": status,
"result": None
}
return None
def process_queue(self):
"""
Background worker thread
Take tasks from queue and execute
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
task = loop.run_until_complete(self.task_queue.dequeue())
if task:
# Update status to processing
loop.run_until_complete(
self.task_queue.update_status(task,"processing")
)
try:
# Execute task
result =self.agent.process(task)
# Save result
self.results[task]={
"status": "completed",
"result": result,
"completed_at": time.time()
}
loop.run_until_complete(
self.task_queue.update_status(task,"completed")
)
except Exception as e:
self.results[task]={
"status": "failed",
"error": str(e)
}
loop.run_until_complete(
self.task_queue.update_status(task,"failed")
)
class TaskQueue:
"""Simple task queue implementation"""
def __init__ (self):
self.queue=Queue()
self.statuses={}
async def enqueue(self, task):
"""Add task to queue"""
self.queue.put(task)
self.statuses[task]="pending"
async def dequeue(self):
"""Take out a task"""
if not self.queue.empty():
return self.queue.get()
return None
async def get_status(self, task_id):
"""Get task status"""
return self.statuses.get(task_id)
async def update_status(self, task_id, status):
"""Update task status"""
self.statuses= status
* * *
## Microservices Design
Split Agent system into multiple independent services to improve maintainability and scalability.
### Service Division
| Service | Responsibility |
| --- | --- |
| API Gateway | Unified entry, authentication and rate limiting, routing |
| Agent Service | Core logic, reasoning and planning |
| Tool Service | External API integration, tool invocation |
| Storage Service | Knowledge base, state management, caching |
| Monitoring Service | Log collection, metrics monitoring, alerting |
### Inter-service Communication
Synchronous communication: HTTP/gRPC, for real-time request-response.
Asynchronous communication: Message queue, for time-consuming operations and event notifications.
* * *
## Chapter Summary
This chapter introduces key practices for production deployment and engineering.
Production deployment architecture scales from single machine to distributed.
Containerized deployment uses Docker and Kubernetes.
Cost control reduces costs through caching and Token optimization.
Streaming and asynchronous improve user experience and system throughput.
Microservices design improves system maintainability and scalability.
Production deployment requires comprehensive consideration of reliability, performance, cost, and other aspects.
It is recommended to start small and gradually increase complexity.
YouTip