YouTip LogoYouTip

Production Deployment

Production deployment involves multiple aspects such as reliability, scalability, and cost control. Engineering practices ensure stable system operation and easy maintenance. * * * ## Production Deployment Architecture Production environments have higher requirements for systems. Multiple aspects need to be considered, including reliability, scalability, monitoring, and alerting. ### Deployment Modes #### Single Machine Deployment Suitable for development and testing environments. Simple and easy to deploy, but cannot handle production-level traffic. #### Distributed Deployment Suitable for production environments, requiring consideration of multiple aspects. Load balancing: Distribute requests to multiple instances. Service discovery: Dynamically manage instance lists. State management: Handle distributed state consistency issues. Fault tolerance: Single point of failure does not affect the overall service. ### Containerized Deployment ## Dockerfile Example # Based on Python 3.11 image FROM python:3.11-slim # Set working directory WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY . . # Create non-root user (security consideration) RUN useradd -m appuser && chown -R appuser:appuser /app USER appuser # Expose port EXPOSE 8000 # Startup command CMD ["python", "agent.py"] ### Kubernetes Deployment ## Kubernetes Deployment Configuration apiVersion: apps/v1 kind: Deployment metadata: name: ai-agent labels: app: ai-agent spec: # Replica count replicas: 3 selector: matchLabels: app: ai-agent template: metadata: labels: app: ai-agent spec: containers: - name: agent image: your-registry/ai-agent:latest ports: - containerPort: 8000 # Resource limits resources: limits: memory: "2Gi" cpu: "1" requests: memory: "1Gi" cpu: "0.5" # Health checks livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 10 periodSeconds: 30 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 10 --- apiVersion: v1 kind: Service metadata: name: ai-agent-service spec: selector: app: ai-agent ports: - port: 80 targetPort: 8000 type: LoadBalancer * * * ## Cost Control The cost of Agent systems mainly comes from LLM API calls. Optimizing costs is an important topic for production deployment. ### Token Optimization Strategies Prompt compression: Streamline prompts to reduce unnecessary text. Context truncation: Only keep key context content. Cache responses: Cache responses for identical or similar queries. ### Code Implementation ## Cost Optimized Agent Implementation import hashlib import json from functools import lru_cache class CostOptimizedAgent: """ Cost-optimized Agent Reduces API call costs through caching and compression """ def __init__ (self, base_agent, cache, embedder=None): # Base Agent self.base_agent= base_agent # Cache storage self.cache= cache # Embedder (for similarity-based cache matching) self.embedder= embedder def process(self, request): """ Process requests with cost optimization logic """ # Generate request fingerprint request_fingerprint =self.generate_fingerprint(request) # Check exact cache cached_result =self.cache.get(request_fingerprint) if cached_result: return{ **cached_result, "from_cache": True } # If embedder exists, check semantic similarity cache if self.embedder: similar =self.find_similar_cached(request) if similar: return{ **similar, "from_cache": True, "similar_to": similar.get("request_id") } # Execute request result =self.base_agent.process(request) # Cache result self.cache.set( request_fingerprint, result, ttl=3600# 1 hour expiration ) return{ **result, "from_cache": False } def generate_fingerprint(self, request): """ Generate request fingerprint Used for exact cache matching """ content = json.dumps(request, sort_keys=True) return hashlib.sha256(content.encode()).hexdigest() def find_similar_cached(self, request, threshold=0.95): """ Find semantically similar cached results Uses cosine similarity of embedding vectors """ if not self.embedder: return None # Vectorize the request request_vector =self.embedder.embed([str(request)]) # Search for similar requests in cache best_match =None best_similarity =0 for cached in self.cache.get_all(): cached_vector = cached similarity = cosine_similarity(request_vector, cached_vector) if similarity > best_similarity and similarity >= threshold: best_similarity = similarity best_match = cached return best_match class ResponseCache: """ Response cache Simple in-memory cache implementation Use Redis in production environments """ def __init__ (self): self.cache={} self.timestamps={} def get(self, key): """Get cache""" if key in self.cache: # Check if expired if self.is_expired(key): self.delete(key) return None return self.cache return None def set(self, key, value, ttl=3600): """Set cache""" self.cache= value self.timestamps=time.time() + ttl def delete(self, key): """Delete cache""" if key in self.cache: del self.cache if key in self.timestamps: del self.timestamps def is_expired(self, key): """Check if expired""" if key not in self.timestamps: return True return time.time()>self.timestamps def get_all(self): """Get all cache items""" return list(self.cache.values()) def cosine_similarity(vec1, vec2): """Calculate cosine similarity""" dot_product =sum(a * b for a, b in zip(vec1, vec2)) norm1 =math.sqrt(sum(a * a for a in vec1)) norm2 =math.sqrt(sum(b * b for b in vec2)) return dot_product / (norm1 * norm2) * * * ## Streaming and Asynchronous ### Streaming Response Streaming response reduces waiting time and improves user experience. No need to wait for complete response, can progressively display generated content. ### Code Implementation ## Streaming Response Agent import asyncio class StreamingAgent: """ Agent with streaming support Generate and return progressively to reduce waiting time """ def __init__ (self, llm): self.llm= llm async def stream_generate(self, prompt): """ Stream generation response Uses async generator pattern :yield: Generated text chunks """ # Start async generation task async for chunk in self.llm.stream_generate(prompt): yield chunk async def process_stream(self, request): """ Process streaming request Returns async generator """ prompt =self.build_prompt(request) # Return generator async def generate(): async for chunk in self.stream_generate(prompt): yield chunk return generate() # Usage example async def main(): agent = StreamingAgent(llm) # Start streaming generation async for token in agent.stream_generate("Explain quantum computing"): # Print as generated print(token, end="", flush=True) print()# Newline # Run asyncio.run(main()) ### Asynchronous Task Queue For long-running tasks, use asynchronous queue processing. User receives immediate return after submitting task, background async execution. ## Asynchronous Task Queue Implementation import asyncio from queue import Queue from threading import Thread import uuid class AsyncAgent: """ Async Agent Uses task queue for long-running tasks """ def __init__ (self, agent, task_queue): # Base Agent self.agent= agent # Task queue self.task_queue= task_queue # Result storage self.results={} # Start background worker thread self.worker_thread= Thread(target=self.process_queue, daemon=True) self.worker_thread.start() async def submit(self, task): """ Submit task Returns task ID immediately """ # Generate unique task ID task_id =str(uuid.uuid4()) # Add to queue await self.task_queue.enqueue({ "id": task_id, "task": task, "status": "pending", "created_at": time.time() }) return task_id async def get_result(self, task_id): """ Get task result Non-blocking """ if task_id in self.results: return self.results # Check task status status = await self.task_queue.get_status(task_id) if status is None: return None# Task does not exist if status =="pending"or status =="processing": return{ "status": status, "result": None } return None def process_queue(self): """ Background worker thread Take tasks from queue and execute """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) while True: task = loop.run_until_complete(self.task_queue.dequeue()) if task: # Update status to processing loop.run_until_complete( self.task_queue.update_status(task,"processing") ) try: # Execute task result =self.agent.process(task) # Save result self.results[task]={ "status": "completed", "result": result, "completed_at": time.time() } loop.run_until_complete( self.task_queue.update_status(task,"completed") ) except Exception as e: self.results[task]={ "status": "failed", "error": str(e) } loop.run_until_complete( self.task_queue.update_status(task,"failed") ) class TaskQueue: """Simple task queue implementation""" def __init__ (self): self.queue=Queue() self.statuses={} async def enqueue(self, task): """Add task to queue""" self.queue.put(task) self.statuses[task]="pending" async def dequeue(self): """Take out a task""" if not self.queue.empty(): return self.queue.get() return None async def get_status(self, task_id): """Get task status""" return self.statuses.get(task_id) async def update_status(self, task_id, status): """Update task status""" self.statuses= status * * * ## Microservices Design Split Agent system into multiple independent services to improve maintainability and scalability. ### Service Division | Service | Responsibility | | --- | --- | | API Gateway | Unified entry, authentication and rate limiting, routing | | Agent Service | Core logic, reasoning and planning | | Tool Service | External API integration, tool invocation | | Storage Service | Knowledge base, state management, caching | | Monitoring Service | Log collection, metrics monitoring, alerting | ### Inter-service Communication Synchronous communication: HTTP/gRPC, for real-time request-response. Asynchronous communication: Message queue, for time-consuming operations and event notifications. * * * ## Chapter Summary This chapter introduces key practices for production deployment and engineering. Production deployment architecture scales from single machine to distributed. Containerized deployment uses Docker and Kubernetes. Cost control reduces costs through caching and Token optimization. Streaming and asynchronous improve user experience and system throughput. Microservices design improves system maintainability and scalability. Production deployment requires comprehensive consideration of reliability, performance, cost, and other aspects. It is recommended to start small and gradually increase complexity.
← Css Position StaticMultimodal Agent β†’