Monitoring Guide
Monitoring Guide
Section titled “Monitoring Guide”Overview
Section titled “Overview”Effective monitoring is crucial for maintaining the health, performance, and reliability of CreativeDynamics in production. This guide covers detailed monitoring strategies, tools, and best practices for observability.
Monitoring Architecture
Section titled “Monitoring Architecture”Key Metrics Categories
Section titled “Key Metrics Categories”-
Application Metrics
- Request throughput and latency
- Error rates and types
- API endpoint performance
- Analysis job duration
- Memory and CPU usage
-
Business Metrics
- Analysis completions per hour
- Data processing volume
- Creative fatigue detection accuracy
- User engagement patterns
-
Infrastructure Metrics
- System resources (CPU, memory, disk, network)
- Container/pod health
- Service availability
- Database performance
Prometheus Integration
Section titled “Prometheus Integration”Setup Prometheus Metrics
Section titled “Setup Prometheus Metrics”Add to your FastAPI application (for example, a thin wrapper module in your deployment that mounts creativedynamics.api.main:app):
from prometheus_client import Counter, Histogram, Gauge, generate_latestfrom prometheus_client import CONTENT_TYPE_LATESTfrom fastapi import Responseimport timefrom functools import wraps
# Define metricsrequest_count = Counter( 'creativedynamics_requests_total', 'Total number of requests', ['method', 'endpoint', 'status'])
request_duration = Histogram( 'creativedynamics_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'])
active_analyses = Gauge( 'creativedynamics_active_analyses', 'Number of active analyses')
analysis_duration = Histogram( 'creativedynamics_analysis_duration_seconds', 'Analysis job duration in seconds', ['dataset', 'status'])
error_count = Counter( 'creativedynamics_errors_total', 'Total number of errors', ['error_type', 'endpoint'])
data_processed = Counter( 'creativedynamics_data_processed_bytes', 'Total bytes of data processed')
# Middleware for automatic metrics collectionasync def metrics_middleware(request, call_next): start_time = time.time()
# Process request response = await call_next(request)
# Record metrics duration = time.time() - start_time request_count.labels( method=request.method, endpoint=request.url.path, status=response.status_code ).inc()
request_duration.labels( method=request.method, endpoint=request.url.path ).observe(duration)
return response
# Metrics endpointasync def metrics_endpoint(): return Response( content=generate_latest(), media_type=CONTENT_TYPE_LATEST )Prometheus Configuration
Section titled “Prometheus Configuration”Create prometheus.yml:
global: scrape_interval: 15s evaluation_interval: 15s
scrape_configs: - job_name: 'creativedynamics' static_configs: - targets: ['localhost:5001'] metrics_path: '/metrics'
- job_name: 'node_exporter' static_configs: - targets: ['localhost:9100']
rule_files: - 'alerts.yml'
alerting: alertmanagers: - static_configs: - targets: ['localhost:9093']Alert Rules
Section titled “Alert Rules”Create alerts.yml:
groups: - name: creativedynamics_alerts interval: 30s rules: # High error rate - alert: HighErrorRate expr: rate(creativedynamics_errors_total[5m]) > 0.05 for: 5m labels: severity: warning annotations: summary: "High error rate detected" description: "Error rate is {{ $value }} errors per second"
# High latency - alert: HighLatency expr: histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m])) > 5 for: 5m labels: severity: warning annotations: summary: "High API latency" description: "95th percentile latency is {{ $value }} seconds"
# Service down - alert: ServiceDown expr: up{job="creativedynamics"} == 0 for: 1m labels: severity: critical annotations: summary: "CreativeDynamics service is down" description: "Service has been down for more than 1 minute"
# High memory usage - alert: HighMemoryUsage expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.9 for: 5m labels: severity: warning annotations: summary: "High memory usage" description: "Memory usage is above 90%"Grafana Dashboards
Section titled “Grafana Dashboards”Main Dashboard JSON
Section titled “Main Dashboard JSON”Create grafana-dashboard.json:
{ "dashboard": { "title": "CreativeDynamics Monitoring", "panels": [ { "title": "Request Rate", "targets": [ { "expr": "rate(creativedynamics_requests_total[5m])", "legendFormat": "{{method}} {{endpoint}}" } ], "type": "graph" }, { "title": "Response Time (95th percentile)", "targets": [ { "expr": "histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m]))", "legendFormat": "95th percentile" } ], "type": "graph" }, { "title": "Error Rate", "targets": [ { "expr": "rate(creativedynamics_errors_total[5m])", "legendFormat": "{{error_type}}" } ], "type": "graph" }, { "title": "Active Analyses", "targets": [ { "expr": "creativedynamics_active_analyses" } ], "type": "stat" }, { "title": "Analysis Duration", "targets": [ { "expr": "histogram_quantile(0.5, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))", "legendFormat": "Median" }, { "expr": "histogram_quantile(0.95, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))", "legendFormat": "95th percentile" } ], "type": "graph" }, { "title": "Data Processed", "targets": [ { "expr": "rate(creativedynamics_data_processed_bytes[5m])", "legendFormat": "Bytes/sec" } ], "type": "graph" } ] }}Logging Strategy
Section titled “Logging Strategy”Structured Logging
Section titled “Structured Logging”Configure structured logging in your application:
import loggingimport jsonfrom datetime import datetimefrom pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter): def add_fields(self, log_record, record, message_dict): super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict) log_record['timestamp'] = datetime.utcnow().isoformat() log_record['level'] = record.levelname log_record['logger'] = record.name log_record['module'] = record.module log_record['function'] = record.funcName log_record['line'] = record.lineno
# Add correlation ID if available if hasattr(record, 'correlation_id'): log_record['correlation_id'] = record.correlation_id
# Add user context if available if hasattr(record, 'user_id'): log_record['user_id'] = record.user_id
def setup_logging(level='INFO', log_file=None): """Configure structured JSON logging.""" formatter = CustomJsonFormatter()
# Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter)
# File handler handlers = [console_handler] if log_file: file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) handlers.append(file_handler)
# Configure root logger logging.basicConfig( level=getattr(logging, level), handlers=handlers )
return logging.getLogger(__name__)Log Aggregation with ELK Stack
Section titled “Log Aggregation with ELK Stack”Logstash Configuration
Section titled “Logstash Configuration”Create logstash.conf:
input { file { path => "/var/log/creativedynamics/*.log" start_position => "beginning" codec => "json" }}
filter { if [level] == "ERROR" { mutate { add_tag => [ "error" ] } }
if [duration] { ruby { code => " event.set('duration_ms', event.get('duration') * 1000) " } }
date { match => [ "timestamp", "ISO8601" ] target => "@timestamp" }}
output { elasticsearch { hosts => ["localhost:9200"] index => "creativedynamics-%{+YYYY.MM.dd}" }
if "error" in [tags] { email { to => "alerts@example.com" subject => "CreativeDynamics Error Alert" body => "Error in %{module}.%{function}: %{message}" } }}Kibana Queries
Section titled “Kibana Queries”Common queries for analysis:
// Find slow analyses{ "query": { "bool": { "must": [ { "term": { "function": "analyze_all_items" } }, { "range": { "duration": { "gte": 60 } } } ] } }}
// Error distribution{ "aggs": { "errors_by_type": { "terms": { "field": "error_type.keyword", "size": 10 } } }}
// User activity patterns{ "aggs": { "requests_over_time": { "date_histogram": { "field": "@timestamp", "calendar_interval": "1h" } } }}Distributed Tracing
Section titled “Distributed Tracing”OpenTelemetry Integration
Section titled “OpenTelemetry Integration”from opentelemetry import tracefrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterfrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.instrumentation.fastapi import FastAPIInstrumentorfrom opentelemetry.instrumentation.requests import RequestsInstrumentor
# Configure tracingdef setup_tracing(): # Set up the tracer provider trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__)
# Configure OTLP exporter otlp_exporter = OTLPSpanExporter( endpoint="localhost:4317", insecure=True )
# Add span processor span_processor = BatchSpanProcessor(otlp_exporter) trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument FastAPI FastAPIInstrumentor.instrument()
# Instrument requests library RequestsInstrumentor().instrument()
return tracer
# Use in applicationtracer = setup_tracing()
@app.post("/analyze")async def analyze_endpoint(data: dict): with tracer.start_as_current_span("analyze_request") as span: span.set_attribute("data.size", len(str(data))) span.set_attribute("data.items", len(data.get("items", [])))
# Process analysis with tracer.start_as_current_span("data_validation"): validate_data(data)
with tracer.start_as_current_span("signature_calculation"): signatures = calculate_signatures(data)
with tracer.start_as_current_span("fatigue_detection"): results = detect_fatigue(signatures)
return resultsHealth Checks and Probes
Section titled “Health Checks and Probes”Detailed Health Check
Section titled “Detailed Health Check”from typing import Dict, Anyimport psutilimport asynciofrom datetime import datetime
class HealthChecker: def __init__(self): self.checks = []
def register_check(self, name: str, check_func): self.checks.append((name, check_func))
async def run_checks(self) -> Dict[str, Any]: results = { "timestamp": datetime.utcnow().isoformat(), "status": "healthy", "checks": {} }
for name, check_func in self.checks: try: if asyncio.iscoroutinefunction(check_func): result = await check_func() else: result = check_func() results["checks"][name] = { "status": "healthy", "details": result } except Exception as e: results["status"] = "unhealthy" results["checks"][name] = { "status": "unhealthy", "error": str(e) }
return results
# Health check functionsdef check_memory(): memory = psutil.virtual_memory() return { "used_percent": memory.percent, "available_gb": memory.available / (1024**3) }
def check_disk(): disk = psutil.disk_usage('/') return { "used_percent": disk.percent, "free_gb": disk.free / (1024**3) }
async def check_api_latency(): import httpx start = time.time() async with httpx.AsyncClient() as client: await client.get("http://localhost:5001/health") return {"latency_ms": (time.time() - start) * 1000}
# Register checkshealth_checker = HealthChecker()health_checker.register_check("memory", check_memory)health_checker.register_check("disk", check_disk)health_checker.register_check("api_latency", check_api_latency)
# Health endpoint@app.get("/health/detailed")async def detailed_health(): return await health_checker.run_checks()Performance Monitoring
Section titled “Performance Monitoring”APM with DataDog
Section titled “APM with DataDog”from ddtrace import tracer, patch_allfrom ddtrace.contrib.asgi import TraceMiddleware
# Patch all supported librariespatch_all()
# Configure DataDogtracer.configure( hostname='localhost', port=8126, analytics_enabled=True, env='production', service='creativedynamics', version='0.9.8.1')
# Add middleware to FastAPIapp.add_middleware( TraceMiddleware, tracer=tracer, service="creativedynamics-api", distributed_tracing=True)
# Custom spans@tracer.wrap(name='custom.analysis')def perform_analysis(data): span = tracer.current_span() span.set_tag('data.size', len(data)) span.set_metric('items.count', len(data.get('items', [])))
# Analysis logic return resultsAlerting Configuration
Section titled “Alerting Configuration”AlertManager Setup
Section titled “AlertManager Setup”Create alertmanager.yml:
global: smtp_smarthost: 'smtp.gmail.com:587' smtp_from: 'creativedynamics@example.com' smtp_auth_username: 'creativedynamics@example.com' smtp_auth_password: 'password'
route: group_by: ['alertname', 'cluster', 'service'] group_wait: 10s group_interval: 10s repeat_interval: 1h receiver: 'team-notifications'
routes: - match: severity: critical receiver: 'pagerduty-critical' continue: true
- match: severity: warning receiver: 'slack-warnings'
receivers: - name: 'team-notifications' email_configs: - to: 'team@example.com' headers: Subject: 'CreativeDynamics Alert: {{ .GroupLabels.alertname }}'
- name: 'pagerduty-critical' pagerduty_configs: - service_key: 'YOUR-PAGERDUTY-KEY' description: '{{ .CommonAnnotations.summary }}'
- name: 'slack-warnings' slack_configs: - api_url: 'YOUR-SLACK-WEBHOOK-URL' channel: '#creativedynamics-alerts' title: 'CreativeDynamics Warning' text: '{{ .CommonAnnotations.description }}'Monitoring Best Practices
Section titled “Monitoring Best Practices”1. Key Performance Indicators (KPIs)
Section titled “1. Key Performance Indicators (KPIs)”Monitor these essential KPIs:
-
Golden Signals:
- Latency: Response time distribution
- Traffic: Requests per second
- Errors: Error rate and types
- Saturation: Resource utilization
-
Business Metrics:
- Analysis completion rate
- Average processing time
- Data volume processed
- API usage patterns
2. Monitoring Checklist
Section titled “2. Monitoring Checklist”- Application metrics exposed via
/metrics - Infrastructure monitoring configured
- Log aggregation pipeline set up
- Distributed tracing enabled
- Alert rules defined and tested
- Dashboards created for key metrics
- Runbooks documented for alerts
- Regular monitoring review scheduled
3. Incident Response
Section titled “3. Incident Response”- Detection: Automated alerts trigger
- Triage: Assess severity and impact
- Diagnosis: Use monitoring tools to identify root cause
- Resolution: Apply fix and verify
- Post-mortem: Document learnings and improvements
Dashboard Examples
Section titled “Dashboard Examples”Service Overview Dashboard
Section titled “Service Overview Dashboard”# Metrics to displayservice_metrics = { "uptime": "process_uptime_seconds", "request_rate": "rate(creativedynamics_requests_total[5m])", "error_rate": "rate(creativedynamics_errors_total[5m])", "p95_latency": "histogram_quantile(0.95, creativedynamics_request_duration_seconds_bucket)", "active_connections": "creativedynamics_active_connections", "memory_usage": "process_resident_memory_bytes", "cpu_usage": "rate(process_cpu_seconds_total[5m])"}Analysis Performance Dashboard
Section titled “Analysis Performance Dashboard”# Analysis-specific metricsanalysis_metrics = { "analyses_per_hour": "rate(creativedynamics_analyses_total[1h]) * 3600", "avg_analysis_time": "rate(creativedynamics_analysis_duration_seconds_sum[1h]) / rate(creativedynamics_analysis_duration_seconds_count[1h])", "signature_calc_time": "histogram_quantile(0.5, creativedynamics_signature_calculation_seconds_bucket)", "wastage_detection_accuracy": "creativedynamics_wastage_detection_accuracy", "data_processing_rate": "rate(creativedynamics_data_processed_bytes[5m])"}Next Steps
Section titled “Next Steps”- Configure Scaling for handling increased load
- Review Security for monitoring security events
- Set up Deployment with monitoring integration