Skip to content

Scaling Guide

This guide provides detailed strategies for scaling CreativeDynamics to handle increased load, larger datasets, and concurrent users. It covers both vertical and horizontal scaling approaches, optimisation techniques, and architectural patterns for high-performance deployments.

  • Current Capacity: ~1000 requests/minute single instance
  • Target Capacity: 10,000+ requests/minute with horizontal scaling
  • Strategy: Load balancing across multiple API instances
  • Current Limit: ~100MB per analysis
  • Target Support: 1GB+ datasets
  • Strategy: Streaming processing and chunked analysis
  • Current: Sequential processing
  • Target: Parallel processing with job queues
  • Strategy: Distributed task processing with Celery/RabbitMQ
docker-compose-scaled.yml
version: '3.8'
services:
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- api1
- api2
- api3
api1:
build: .
environment:
- INSTANCE_ID=1
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
depends_on:
- redis
- postgres
api2:
build: .
environment:
- INSTANCE_ID=2
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
depends_on:
- redis
- postgres
api3:
build: .
environment:
- INSTANCE_ID=3
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
depends_on:
- redis
- postgres
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
postgres:
image: postgres:15
environment:
- POSTGRES_DB=creativedynamics
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
worker:
build: .
command: celery -A your_service.tasks worker --loglevel=info
environment:
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
depends_on:
- redis
- postgres
deploy:
replicas: 3
volumes:
redis_data:
postgres_data:
nginx.conf
upstream creativedynamics_api {
least_conn; # Use least connections load balancing
server api1:5001 max_fails=3 fail_timeout=30s;
server api2:5001 max_fails=3 fail_timeout=30s;
server api3:5001 max_fails=3 fail_timeout=30s;
# Health check
keepalive 32;
}
server {
listen 80;
server_name _;
# API endpoints
location / {
proxy_pass http://creativedynamics_api;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Connection pooling
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# Buffering
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
proxy_busy_buffers_size 8k;
}
# Health check endpoint
location /health {
access_log off;
proxy_pass http://creativedynamics_api/health;
}
}
your_service/tasks/celery_app.py
from celery import Celery
from kombu import Queue
import os
# Create Celery instance
app = Celery('creativedynamics')
# Configuration
app.conf.update(
broker_url=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
result_backend=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
# Task execution settings
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# Performance tuning
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
task_acks_late=True,
# Task routing
task_routes={
'your_service.tasks.analysis.*': {'queue': 'analysis'},
'your_service.tasks.signature.*': {'queue': 'signature'},
'your_service.tasks.report.*': {'queue': 'report'},
},
# Queue configuration
task_queues=(
Queue('analysis', routing_key='analysis.#', priority=10),
Queue('signature', routing_key='signature.#', priority=5),
Queue('report', routing_key='report.#', priority=1),
),
# Result expiration
result_expires=3600, # 1 hour
# Rate limiting
task_annotations={
'your_service.tasks.analysis.heavy_analysis': {
'rate_limit': '10/m', # 10 per minute
},
},
)
your_service/tasks/analysis.py
from celery import Task
from typing import Dict, Any
import numpy as np
from creativedynamics.core.analyzer import analyze_all_items
from your_service.tasks.celery_app import app
class AnalysisTask(Task):
"""Base task with automatic retries and error handling."""
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3, 'countdown': 5}
track_started = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure."""
logger.error(f"Task {task_id} failed: {exc}")
# Send alert notification
send_alert(f"Analysis task failed: {task_id}")
def on_success(self, retval, task_id, args, kwargs):
"""Handle task success."""
logger.info(f"Task {task_id} completed successfully")
# Update metrics
task_completion_counter.inc()
@app.task(base=AnalysisTask, name='your_service.tasks.analysis.analyze')
def analyze_async(
data: Dict[str, Any],
config: Dict[str, Any]
) -> Dict[str, Any]:
"""Asynchronous analysis task."""
# Update task state
analyze_async.update_state(
state='PROCESSING',
meta={'current': 0, 'total': len(data.get('items', []))}
)
# Perform analysis
results = analyze_all_items(data, config)
# Cache results
cache_key = f"analysis:{task_id}"
redis_client.setex(cache_key, 3600, json.dumps(results))
return results
@app.task(name='your_service.tasks.analysis.batch_analyze')
def batch_analyze(
batch_data: List[Dict[str, Any]],
config: Dict[str, Any]
) -> List[str]:
"""Process multiple analyses in parallel."""
# Create subtasks
job = group(
analyze_async.s(data, config)
for data in batch_data
)
# Execute in parallel
result = job.apply_async()
return [r.id for r in result.results]
your_service/db/connection.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import os
def create_db_engine():
"""Create database engine with connection pooling."""
database_url = os.getenv('DATABASE_URL')
engine = create_engine(
database_url,
# Connection pool settings
poolclass=QueuePool,
pool_size=20, # Number of persistent connections
max_overflow=10, # Maximum overflow connections
pool_timeout=30, # Timeout for getting connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Test connections before using
# Performance settings
echo=False, # Disable SQL logging in production
connect_args={
"connect_timeout": 10,
"application_name": "creativedynamics",
"options": "-c statement_timeout=30000" # 30 second timeout
}
)
return engine
# Global connection pool
engine = create_db_engine()
your_service/db/replicas.py
from sqlalchemy import create_engine
from random import choice
class DatabaseRouter:
"""Route database queries to appropriate instances."""
def __init__(self):
self.master = create_engine(os.getenv('DATABASE_MASTER_URL'))
self.replicas = [
create_engine(os.getenv(f'DATABASE_REPLICA_{i}_URL'))
for i in range(1, 4)
]
def get_read_engine(self):
"""Get a read replica connection."""
return choice(self.replicas)
def get_write_engine(self):
"""Get master connection for writes."""
return self.master
def execute_read(self, query):
"""Execute read query on replica."""
engine = self.get_read_engine()
with engine.connect() as conn:
return conn.execute(query)
def execute_write(self, query):
"""Execute write query on master."""
with self.master.connect() as conn:
return conn.execute(query)
# Global router
db_router = DatabaseRouter()
your_service/cache/manager.py
import redis
from functools import wraps
import hashlib
import json
import pickle
from typing import Any, Optional
import asyncio
class CacheManager:
"""Multi-layer caching with Redis and local memory."""
def __init__(self):
# Redis connection
self.redis = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=0,
decode_responses=False,
connection_pool_kwargs={
'max_connections': 50,
'socket_keepalive': True,
'socket_keepalive_options': {
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5, # TCP_KEEPCNT
}
}
)
# Local memory cache (LRU)
self.local_cache = {}
self.max_local_size = 1000
def _generate_key(self, prefix: str, params: dict) -> str:
"""Generate cache key from parameters."""
param_str = json.dumps(params, sort_keys=True)
hash_val = hashlib.md5(param_str.encode()).hexdigest()
return f"{prefix}:{hash_val}"
def get(self, key: str) -> Optional[Any]:
"""Get value from cache (local first, then Redis)."""
# Check local cache
if key in self.local_cache:
return self.local_cache[key]
# Check Redis
value = self.redis.get(key)
if value:
deserialized = pickle.loads(value)
# Store in local cache
self._update_local_cache(key, deserialized)
return deserialized
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""Set value in both cache layers."""
# Serialize for Redis
serialized = pickle.dumps(value)
# Store in Redis with TTL
self.redis.setex(key, ttl, serialized)
# Store in local cache
self._update_local_cache(key, value)
def _update_local_cache(self, key: str, value: Any):
"""Update local cache with LRU eviction."""
if len(self.local_cache) >= self.max_local_size:
# Remove oldest item (simple FIFO for demonstration)
oldest_key = next(iter(self.local_cache))
del self.local_cache[oldest_key]
self.local_cache[key] = value
def cache_result(self, prefix: str, ttl: int = 3600):
"""Decorator for caching function results."""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_key(
prefix,
{'args': args, 'kwargs': kwargs}
)
# Check cache
cached = self.get(cache_key)
if cached is not None:
return cached
# Execute function
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
# Cache result
self.set(cache_key, result, ttl)
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_key(
prefix,
{'args': args, 'kwargs': kwargs}
)
# Check cache
cached = self.get(cache_key)
if cached is not None:
return cached
# Execute function
result = func(*args, **kwargs)
# Cache result
self.set(cache_key, result, ttl)
return result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# Global cache manager
cache = CacheManager()
# Usage example
@cache.cache_result('signature_calc', ttl=7200)
def calculate_expensive_signature(data):
"""Expensive calculation that benefits from caching."""
# ... complex computation ...
return result
your_service/api/batch.py
from typing import List, Dict, Any
import asyncio
from collections import defaultdict
import time
class BatchProcessor:
"""Batch multiple requests for efficient processing."""
def __init__(self, batch_size: int = 10, batch_timeout: float = 0.1):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.pending_requests = defaultdict(list)
self.locks = defaultdict(asyncio.Lock)
async def add_request(
self,
batch_key: str,
request_data: Dict[str, Any]
) -> Any:
"""Add request to batch and wait for result."""
request_id = str(uuid.uuid4())
future = asyncio.Future()
async with self.locks[batch_key]:
self.pending_requests[batch_key].append({
'id': request_id,
'data': request_data,
'future': future
})
# Check if batch is ready
if len(self.pending_requests[batch_key]) >= self.batch_size:
await self._process_batch(batch_key)
else:
# Schedule batch processing after timeout
asyncio.create_task(
self._process_batch_delayed(batch_key)
)
return await future
async def _process_batch_delayed(self, batch_key: str):
"""Process batch after timeout."""
await asyncio.sleep(self.batch_timeout)
async with self.locks[batch_key]:
if self.pending_requests[batch_key]:
await self._process_batch(batch_key)
async def _process_batch(self, batch_key: str):
"""Process all pending requests in batch."""
if not self.pending_requests[batch_key]:
return
batch = self.pending_requests[batch_key]
self.pending_requests[batch_key] = []
try:
# Process entire batch
batch_data = [req['data'] for req in batch]
results = await self._execute_batch(batch_data)
# Distribute results
for req, result in zip(batch, results):
req['future'].set_result(result)
except Exception as e:
# Set exception for all requests
for req in batch:
req['future'].set_exception(e)
async def _execute_batch(self, batch_data: List[Dict[str, Any]]):
"""Execute batch processing logic."""
# Implement actual batch processing
return await process_batch_analysis(batch_data)
creativedynamics/streaming/processor.py
from typing import AsyncIterator, Dict, Any
import asyncio
import aiofiles
import pandas as pd
class StreamProcessor:
"""Process large datasets in streaming fashion."""
def __init__(self, chunk_size: int = 10000):
self.chunk_size = chunk_size
async def process_file_stream(
self,
file_path: str
) -> AsyncIterator[Dict[str, Any]]:
"""Process large file in chunks."""
async with aiofiles.open(file_path, mode='r') as file:
# Read header
header = await file.readline()
columns = header.strip().split(',')
chunk_data = []
async for line in file:
chunk_data.append(line.strip().split(','))
if len(chunk_data) >= self.chunk_size:
# Process chunk
df = pd.DataFrame(chunk_data, columns=columns)
result = await self._process_chunk(df)
yield result
chunk_data = []
# Process remaining data
if chunk_data:
df = pd.DataFrame(chunk_data, columns=columns)
result = await self._process_chunk(df)
yield result
async def _process_chunk(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Process a single chunk of data."""
# Calculate signatures for chunk
signatures = calculate_path_signatures(df)
# Detect patterns
patterns = detect_fatigue_patterns(signatures)
return {
'chunk_size': len(df),
'signatures': signatures,
'patterns': patterns
}
async def merge_results(
self,
results: AsyncIterator[Dict[str, Any]]
) -> Dict[str, Any]:
"""Merge results from all chunks."""
merged = {
'total_records': 0,
'all_patterns': [],
'aggregated_metrics': {}
}
async for chunk_result in results:
merged['total_records'] += chunk_result['chunk_size']
merged['all_patterns'].extend(chunk_result['patterns'])
# Aggregate metrics
for key, value in chunk_result.get('metrics', {}).items():
if key not in merged['aggregated_metrics']:
merged['aggregated_metrics'][key] = []
merged['aggregated_metrics'][key].append(value)
# Finalize aggregation
for key in merged['aggregated_metrics']:
merged['aggregated_metrics'][key] = np.mean(
merged['aggregated_metrics'][key]
)
return merged
hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: creativedynamics-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: creativedynamics
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: request_rate
target:
type: AverageValue
averageValue: "100"
behaviour:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
terraform/autoscaling.tf
resource "aws_autoscaling_group" "creativedynamics" {
name = "creativedynamics-asg"
min_size = 2
max_size = 10
desired_capacity = 3
health_check_type = "ELB"
health_check_grace_period = 300
launch_template {
id = aws_launch_template.creativedynamics.id
version = "$Latest"
}
target_group_arns = [aws_lb_target_group.creativedynamics.arn]
tag {
key = "Name"
value = "creativedynamics-instance"
propagate_at_launch = true
}
}
resource "aws_autoscaling_policy" "scale_up" {
name = "creativedynamics-scale-up"
scaling_adjustment = 2
adjustment_type = "ChangeInCapacity"
cooldown = 300
autoscaling_group_name = aws_autoscaling_group.creativedynamics.name
}
resource "aws_autoscaling_policy" "scale_down" {
name = "creativedynamics-scale-down"
scaling_adjustment = -1
adjustment_type = "ChangeInCapacity"
cooldown = 300
autoscaling_group_name = aws_autoscaling_group.creativedynamics.name
}
resource "aws_cloudwatch_metric_alarm" "high_cpu" {
alarm_name = "creativedynamics-high-cpu"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "2"
metric_name = "CPUUtilization"
namespace = "AWS/EC2"
period = "120"
statistic = "Average"
threshold = "70"
alarm_description = "This metric monitors CPU utilization"
alarm_actions = [aws_autoscaling_policy.scale_up.arn]
}
load_test/locustfile.py
from locust import HttpUser, task, between
import random
import json
class CreativeDynamicsUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize user session."""
self.client.verify = False
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token'
}
@task(3)
def health_check(self):
"""Simple health check."""
self.client.get("/health")
@task(10)
def analyze_small(self):
"""Submit small analysis job."""
data = self.generate_test_data(items=10)
self.client.post(
"/api/analyze",
json=data,
headers=self.headers
)
@task(5)
def analyze_medium(self):
"""Submit medium analysis job."""
data = self.generate_test_data(items=100)
self.client.post(
"/api/analyze",
json=data,
headers=self.headers
)
@task(1)
def analyze_large(self):
"""Submit large analysis job."""
data = self.generate_test_data(items=1000)
self.client.post(
"/api/analyze",
json=data,
headers=self.headers,
timeout=60
)
def generate_test_data(self, items: int):
"""Generate test data for analysis."""
return {
'dataset_name': f'test_dataset_{random.randint(1, 1000)}',
'items': [
{
'id': f'item_{i}',
'metrics': {
'impressions': random.randint(1000, 100000),
'clicks': random.randint(10, 1000),
'ctr': random.uniform(0.001, 0.1),
'spend': random.uniform(10, 1000)
},
'timestamp': '2024-01-01T00:00:00Z'
}
for i in range(items)
]
}
# Run with: locust -f locustfile.py --host=http://localhost:5001
  • Implement connection pooling
  • Set up caching layer (Redis)
  • Configure load balancer
  • Implement health checks
  • Set up monitoring and metrics
  • Configure auto-scaling policies
  • Implement rate limiting
  • Set up CDN for static assets
  • Phase 1: 2-3 instances, 100 concurrent users
  • Phase 2: 5-10 instances, 500 concurrent users
  • Phase 3: 10-20 instances, 1000+ concurrent users
  • Phase 4: Multi-region deployment
  • API response time: < 200ms (p95)
  • Analysis completion: < 30s for standard datasets
  • Throughput: 10,000+ requests/minute
  • Availability: 99.9% uptime