Scaling Guide
Scaling Guide
Section titled “Scaling Guide”Overview
Section titled “Overview”This guide provides detailed strategies for scaling CreativeDynamics to handle increased load, larger datasets, and concurrent users. It covers both vertical and horizontal scaling approaches, optimisation techniques, and architectural patterns for high-performance deployments.
Scaling Dimensions
Section titled “Scaling Dimensions”1. Request Volume Scaling
Section titled “1. Request Volume Scaling”- Current Capacity: ~1000 requests/minute single instance
- Target Capacity: 10,000+ requests/minute with horizontal scaling
- Strategy: Load balancing across multiple API instances
2. Data Volume Scaling
Section titled “2. Data Volume Scaling”- Current Limit: ~100MB per analysis
- Target Support: 1GB+ datasets
- Strategy: Streaming processing and chunked analysis
3. Concurrent Analysis Scaling
Section titled “3. Concurrent Analysis Scaling”- Current: Sequential processing
- Target: Parallel processing with job queues
- Strategy: Distributed task processing with Celery/RabbitMQ
Horizontal Scaling Architecture
Section titled “Horizontal Scaling Architecture”Multi-Instance Deployment
Section titled “Multi-Instance Deployment”version: '3.8'
services: nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf depends_on: - api1 - api2 - api3
api1: build: . environment: - INSTANCE_ID=1 - REDIS_URL=redis://redis:6379 - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics depends_on: - redis - postgres
api2: build: . environment: - INSTANCE_ID=2 - REDIS_URL=redis://redis:6379 - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics depends_on: - redis - postgres
api3: build: . environment: - INSTANCE_ID=3 - REDIS_URL=redis://redis:6379 - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics depends_on: - redis - postgres
redis: image: redis:7-alpine volumes: - redis_data:/data
postgres: image: postgres:15 environment: - POSTGRES_DB=creativedynamics - POSTGRES_USER=user - POSTGRES_PASSWORD=pass volumes: - postgres_data:/var/lib/postgresql/data
worker: build: . command: celery -A your_service.tasks worker --loglevel=info environment: - REDIS_URL=redis://redis:6379 - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics depends_on: - redis - postgres deploy: replicas: 3
volumes: redis_data: postgres_data:Nginx Load Balancer Configuration
Section titled “Nginx Load Balancer Configuration”upstream creativedynamics_api { least_conn; # Use least connections load balancing server api1:5001 max_fails=3 fail_timeout=30s; server api2:5001 max_fails=3 fail_timeout=30s; server api3:5001 max_fails=3 fail_timeout=30s;
# Health check keepalive 32;}
server { listen 80; server_name _;
# API endpoints location / { proxy_pass http://creativedynamics_api; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme;
# Connection pooling proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s;
# Buffering proxy_buffering on; proxy_buffer_size 4k; proxy_buffers 8 4k; proxy_busy_buffers_size 8k; }
# Health check endpoint location /health { access_log off; proxy_pass http://creativedynamics_api/health; }}Distributed Task Processing
Section titled “Distributed Task Processing”Celery Configuration
Section titled “Celery Configuration”from celery import Celeryfrom kombu import Queueimport os
# Create Celery instanceapp = Celery('creativedynamics')
# Configurationapp.conf.update( broker_url=os.getenv('REDIS_URL', 'redis://localhost:6379/0'), result_backend=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
# Task execution settings task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True,
# Performance tuning worker_prefetch_multiplier=4, worker_max_tasks_per_child=1000, task_acks_late=True,
# Task routing task_routes={ 'your_service.tasks.analysis.*': {'queue': 'analysis'}, 'your_service.tasks.signature.*': {'queue': 'signature'}, 'your_service.tasks.report.*': {'queue': 'report'}, },
# Queue configuration task_queues=( Queue('analysis', routing_key='analysis.#', priority=10), Queue('signature', routing_key='signature.#', priority=5), Queue('report', routing_key='report.#', priority=1), ),
# Result expiration result_expires=3600, # 1 hour
# Rate limiting task_annotations={ 'your_service.tasks.analysis.heavy_analysis': { 'rate_limit': '10/m', # 10 per minute }, },)Async Task Implementation
Section titled “Async Task Implementation”from celery import Taskfrom typing import Dict, Anyimport numpy as npfrom creativedynamics.core.analyzer import analyze_all_itemsfrom your_service.tasks.celery_app import app
class AnalysisTask(Task): """Base task with automatic retries and error handling."""
autoretry_for = (Exception,) retry_kwargs = {'max_retries': 3, 'countdown': 5} track_started = True
def on_failure(self, exc, task_id, args, kwargs, einfo): """Handle task failure.""" logger.error(f"Task {task_id} failed: {exc}") # Send alert notification send_alert(f"Analysis task failed: {task_id}")
def on_success(self, retval, task_id, args, kwargs): """Handle task success.""" logger.info(f"Task {task_id} completed successfully") # Update metrics task_completion_counter.inc()
@app.task(base=AnalysisTask, name='your_service.tasks.analysis.analyze')def analyze_async( data: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: """Asynchronous analysis task."""
# Update task state analyze_async.update_state( state='PROCESSING', meta={'current': 0, 'total': len(data.get('items', []))} )
# Perform analysis results = analyze_all_items(data, config)
# Cache results cache_key = f"analysis:{task_id}" redis_client.setex(cache_key, 3600, json.dumps(results))
return results
@app.task(name='your_service.tasks.analysis.batch_analyze')def batch_analyze( batch_data: List[Dict[str, Any]], config: Dict[str, Any]) -> List[str]: """Process multiple analyses in parallel."""
# Create subtasks job = group( analyze_async.s(data, config) for data in batch_data )
# Execute in parallel result = job.apply_async()
return [r.id for r in result.results]Database Scaling
Section titled “Database Scaling”Connection Pooling
Section titled “Connection Pooling”from sqlalchemy import create_enginefrom sqlalchemy.pool import QueuePoolimport os
def create_db_engine(): """Create database engine with connection pooling."""
database_url = os.getenv('DATABASE_URL')
engine = create_engine( database_url, # Connection pool settings poolclass=QueuePool, pool_size=20, # Number of persistent connections max_overflow=10, # Maximum overflow connections pool_timeout=30, # Timeout for getting connection pool_recycle=3600, # Recycle connections after 1 hour pool_pre_ping=True, # Test connections before using
# Performance settings echo=False, # Disable SQL logging in production connect_args={ "connect_timeout": 10, "application_name": "creativedynamics", "options": "-c statement_timeout=30000" # 30 second timeout } )
return engine
# Global connection poolengine = create_db_engine()Read Replica Configuration
Section titled “Read Replica Configuration”from sqlalchemy import create_enginefrom random import choice
class DatabaseRouter: """Route database queries to appropriate instances."""
def __init__(self): self.master = create_engine(os.getenv('DATABASE_MASTER_URL')) self.replicas = [ create_engine(os.getenv(f'DATABASE_REPLICA_{i}_URL')) for i in range(1, 4) ]
def get_read_engine(self): """Get a read replica connection.""" return choice(self.replicas)
def get_write_engine(self): """Get master connection for writes.""" return self.master
def execute_read(self, query): """Execute read query on replica.""" engine = self.get_read_engine() with engine.connect() as conn: return conn.execute(query)
def execute_write(self, query): """Execute write query on master.""" with self.master.connect() as conn: return conn.execute(query)
# Global routerdb_router = DatabaseRouter()Caching Strategy
Section titled “Caching Strategy”Multi-Layer Caching
Section titled “Multi-Layer Caching”import redisfrom functools import wrapsimport hashlibimport jsonimport picklefrom typing import Any, Optionalimport asyncio
class CacheManager: """Multi-layer caching with Redis and local memory."""
def __init__(self): # Redis connection self.redis = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', 6379)), db=0, decode_responses=False, connection_pool_kwargs={ 'max_connections': 50, 'socket_keepalive': True, 'socket_keepalive_options': { 1: 1, # TCP_KEEPIDLE 2: 3, # TCP_KEEPINTVL 3: 5, # TCP_KEEPCNT } } )
# Local memory cache (LRU) self.local_cache = {} self.max_local_size = 1000
def _generate_key(self, prefix: str, params: dict) -> str: """Generate cache key from parameters.""" param_str = json.dumps(params, sort_keys=True) hash_val = hashlib.md5(param_str.encode()).hexdigest() return f"{prefix}:{hash_val}"
def get(self, key: str) -> Optional[Any]: """Get value from cache (local first, then Redis)."""
# Check local cache if key in self.local_cache: return self.local_cache[key]
# Check Redis value = self.redis.get(key) if value: deserialized = pickle.loads(value) # Store in local cache self._update_local_cache(key, deserialized) return deserialized
return None
def set(self, key: str, value: Any, ttl: int = 3600): """Set value in both cache layers."""
# Serialize for Redis serialized = pickle.dumps(value)
# Store in Redis with TTL self.redis.setex(key, ttl, serialized)
# Store in local cache self._update_local_cache(key, value)
def _update_local_cache(self, key: str, value: Any): """Update local cache with LRU eviction."""
if len(self.local_cache) >= self.max_local_size: # Remove oldest item (simple FIFO for demonstration) oldest_key = next(iter(self.local_cache)) del self.local_cache[oldest_key]
self.local_cache[key] = value
def cache_result(self, prefix: str, ttl: int = 3600): """Decorator for caching function results."""
def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): # Generate cache key cache_key = self._generate_key( prefix, {'args': args, 'kwargs': kwargs} )
# Check cache cached = self.get(cache_key) if cached is not None: return cached
# Execute function if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs)
# Cache result self.set(cache_key, result, ttl)
return result
@wraps(func) def sync_wrapper(*args, **kwargs): # Generate cache key cache_key = self._generate_key( prefix, {'args': args, 'kwargs': kwargs} )
# Check cache cached = self.get(cache_key) if cached is not None: return cached
# Execute function result = func(*args, **kwargs)
# Cache result self.set(cache_key, result, ttl)
return result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# Global cache managercache = CacheManager()
# Usage example@cache.cache_result('signature_calc', ttl=7200)def calculate_expensive_signature(data): """Expensive calculation that benefits from caching.""" # ... complex computation ... return resultPerformance Optimisation
Section titled “Performance Optimisation”Request Batching
Section titled “Request Batching”from typing import List, Dict, Anyimport asynciofrom collections import defaultdictimport time
class BatchProcessor: """Batch multiple requests for efficient processing."""
def __init__(self, batch_size: int = 10, batch_timeout: float = 0.1): self.batch_size = batch_size self.batch_timeout = batch_timeout self.pending_requests = defaultdict(list) self.locks = defaultdict(asyncio.Lock)
async def add_request( self, batch_key: str, request_data: Dict[str, Any] ) -> Any: """Add request to batch and wait for result."""
request_id = str(uuid.uuid4()) future = asyncio.Future()
async with self.locks[batch_key]: self.pending_requests[batch_key].append({ 'id': request_id, 'data': request_data, 'future': future })
# Check if batch is ready if len(self.pending_requests[batch_key]) >= self.batch_size: await self._process_batch(batch_key) else: # Schedule batch processing after timeout asyncio.create_task( self._process_batch_delayed(batch_key) )
return await future
async def _process_batch_delayed(self, batch_key: str): """Process batch after timeout.""" await asyncio.sleep(self.batch_timeout) async with self.locks[batch_key]: if self.pending_requests[batch_key]: await self._process_batch(batch_key)
async def _process_batch(self, batch_key: str): """Process all pending requests in batch."""
if not self.pending_requests[batch_key]: return
batch = self.pending_requests[batch_key] self.pending_requests[batch_key] = []
try: # Process entire batch batch_data = [req['data'] for req in batch] results = await self._execute_batch(batch_data)
# Distribute results for req, result in zip(batch, results): req['future'].set_result(result)
except Exception as e: # Set exception for all requests for req in batch: req['future'].set_exception(e)
async def _execute_batch(self, batch_data: List[Dict[str, Any]]): """Execute batch processing logic.""" # Implement actual batch processing return await process_batch_analysis(batch_data)Data Streaming
Section titled “Data Streaming”from typing import AsyncIterator, Dict, Anyimport asyncioimport aiofilesimport pandas as pd
class StreamProcessor: """Process large datasets in streaming fashion."""
def __init__(self, chunk_size: int = 10000): self.chunk_size = chunk_size
async def process_file_stream( self, file_path: str ) -> AsyncIterator[Dict[str, Any]]: """Process large file in chunks."""
async with aiofiles.open(file_path, mode='r') as file: # Read header header = await file.readline() columns = header.strip().split(',')
chunk_data = [] async for line in file: chunk_data.append(line.strip().split(','))
if len(chunk_data) >= self.chunk_size: # Process chunk df = pd.DataFrame(chunk_data, columns=columns) result = await self._process_chunk(df) yield result chunk_data = []
# Process remaining data if chunk_data: df = pd.DataFrame(chunk_data, columns=columns) result = await self._process_chunk(df) yield result
async def _process_chunk(self, df: pd.DataFrame) -> Dict[str, Any]: """Process a single chunk of data."""
# Calculate signatures for chunk signatures = calculate_path_signatures(df)
# Detect patterns patterns = detect_fatigue_patterns(signatures)
return { 'chunk_size': len(df), 'signatures': signatures, 'patterns': patterns }
async def merge_results( self, results: AsyncIterator[Dict[str, Any]] ) -> Dict[str, Any]: """Merge results from all chunks."""
merged = { 'total_records': 0, 'all_patterns': [], 'aggregated_metrics': {} }
async for chunk_result in results: merged['total_records'] += chunk_result['chunk_size'] merged['all_patterns'].extend(chunk_result['patterns'])
# Aggregate metrics for key, value in chunk_result.get('metrics', {}).items(): if key not in merged['aggregated_metrics']: merged['aggregated_metrics'][key] = [] merged['aggregated_metrics'][key].append(value)
# Finalize aggregation for key in merged['aggregated_metrics']: merged['aggregated_metrics'][key] = np.mean( merged['aggregated_metrics'][key] )
return mergedAuto-Scaling Configuration
Section titled “Auto-Scaling Configuration”Kubernetes HPA
Section titled “Kubernetes HPA”apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: creativedynamics-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: creativedynamics minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 - type: Pods pods: metric: name: request_rate target: type: AverageValue averageValue: "100" behaviour: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 50 periodSeconds: 60 scaleUp: stabilizationWindowSeconds: 60 policies: - type: Percent value: 100 periodSeconds: 60 - type: Pods value: 2 periodSeconds: 60AWS Auto Scaling
Section titled “AWS Auto Scaling”resource "aws_autoscaling_group" "creativedynamics" { name = "creativedynamics-asg" min_size = 2 max_size = 10 desired_capacity = 3 health_check_type = "ELB" health_check_grace_period = 300
launch_template { id = aws_launch_template.creativedynamics.id version = "$Latest" }
target_group_arns = [aws_lb_target_group.creativedynamics.arn]
tag { key = "Name" value = "creativedynamics-instance" propagate_at_launch = true }}
resource "aws_autoscaling_policy" "scale_up" { name = "creativedynamics-scale-up" scaling_adjustment = 2 adjustment_type = "ChangeInCapacity" cooldown = 300 autoscaling_group_name = aws_autoscaling_group.creativedynamics.name}
resource "aws_autoscaling_policy" "scale_down" { name = "creativedynamics-scale-down" scaling_adjustment = -1 adjustment_type = "ChangeInCapacity" cooldown = 300 autoscaling_group_name = aws_autoscaling_group.creativedynamics.name}
resource "aws_cloudwatch_metric_alarm" "high_cpu" { alarm_name = "creativedynamics-high-cpu" comparison_operator = "GreaterThanThreshold" evaluation_periods = "2" metric_name = "CPUUtilization" namespace = "AWS/EC2" period = "120" statistic = "Average" threshold = "70" alarm_description = "This metric monitors CPU utilization" alarm_actions = [aws_autoscaling_policy.scale_up.arn]}Load Testing
Section titled “Load Testing”Locust Configuration
Section titled “Locust Configuration”from locust import HttpUser, task, betweenimport randomimport json
class CreativeDynamicsUser(HttpUser): wait_time = between(1, 3)
def on_start(self): """Initialize user session.""" self.client.verify = False self.headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer test-token' }
@task(3) def health_check(self): """Simple health check.""" self.client.get("/health")
@task(10) def analyze_small(self): """Submit small analysis job.""" data = self.generate_test_data(items=10) self.client.post( "/api/analyze", json=data, headers=self.headers )
@task(5) def analyze_medium(self): """Submit medium analysis job.""" data = self.generate_test_data(items=100) self.client.post( "/api/analyze", json=data, headers=self.headers )
@task(1) def analyze_large(self): """Submit large analysis job.""" data = self.generate_test_data(items=1000) self.client.post( "/api/analyze", json=data, headers=self.headers, timeout=60 )
def generate_test_data(self, items: int): """Generate test data for analysis.""" return { 'dataset_name': f'test_dataset_{random.randint(1, 1000)}', 'items': [ { 'id': f'item_{i}', 'metrics': { 'impressions': random.randint(1000, 100000), 'clicks': random.randint(10, 1000), 'ctr': random.uniform(0.001, 0.1), 'spend': random.uniform(10, 1000) }, 'timestamp': '2024-01-01T00:00:00Z' } for i in range(items) ] }
# Run with: locust -f locustfile.py --host=http://localhost:5001Scaling Checklist
Section titled “Scaling Checklist”Pre-Scaling Requirements
Section titled “Pre-Scaling Requirements”- Implement connection pooling
- Set up caching layer (Redis)
- Configure load balancer
- Implement health checks
- Set up monitoring and metrics
- Configure auto-scaling policies
- Implement rate limiting
- Set up CDN for static assets
Scaling Milestones
Section titled “Scaling Milestones”- Phase 1: 2-3 instances, 100 concurrent users
- Phase 2: 5-10 instances, 500 concurrent users
- Phase 3: 10-20 instances, 1000+ concurrent users
- Phase 4: Multi-region deployment
Performance Targets
Section titled “Performance Targets”- API response time: < 200ms (p95)
- Analysis completion: < 30s for standard datasets
- Throughput: 10,000+ requests/minute
- Availability: 99.9% uptime
Next Steps
Section titled “Next Steps”- Review Security for secure scaling practices
- Configure Monitoring for scaled deployments
- Set up Deployment with scaling infrastructure