Skip to content

Monitoring Guide

Effective monitoring is crucial for maintaining the health, performance, and reliability of CreativeDynamics in production. This guide covers detailed monitoring strategies, tools, and best practices for observability.

  1. Application Metrics

    • Request throughput and latency
    • Error rates and types
    • API endpoint performance
    • Analysis job duration
    • Memory and CPU usage
  2. Business Metrics

    • Analysis completions per hour
    • Data processing volume
    • Creative fatigue detection accuracy
    • User engagement patterns
  3. Infrastructure Metrics

    • System resources (CPU, memory, disk, network)
    • Container/pod health
    • Service availability
    • Database performance

Add to your FastAPI application (for example, a thin wrapper module in your deployment that mounts creativedynamics.api.main:app):

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
from fastapi import Response
import time
from functools import wraps
# Define metrics
request_count = Counter(
'creativedynamics_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'creativedynamics_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
active_analyses = Gauge(
'creativedynamics_active_analyses',
'Number of active analyses'
)
analysis_duration = Histogram(
'creativedynamics_analysis_duration_seconds',
'Analysis job duration in seconds',
['dataset', 'status']
)
error_count = Counter(
'creativedynamics_errors_total',
'Total number of errors',
['error_type', 'endpoint']
)
data_processed = Counter(
'creativedynamics_data_processed_bytes',
'Total bytes of data processed'
)
# Middleware for automatic metrics collection
async def metrics_middleware(request, call_next):
start_time = time.time()
# Process request
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
# Metrics endpoint
async def metrics_endpoint():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)

Create prometheus.yml:

global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'creativedynamics'
static_configs:
- targets: ['localhost:5001']
metrics_path: '/metrics'
- job_name: 'node_exporter'
static_configs:
- targets: ['localhost:9100']
rule_files:
- 'alerts.yml'
alerting:
alertmanagers:
- static_configs:
- targets: ['localhost:9093']

Create alerts.yml:

groups:
- name: creativedynamics_alerts
interval: 30s
rules:
# High error rate
- alert: HighErrorRate
expr: rate(creativedynamics_errors_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"
# High latency
- alert: HighLatency
expr: histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High API latency"
description: "95th percentile latency is {{ $value }} seconds"
# Service down
- alert: ServiceDown
expr: up{job="creativedynamics"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "CreativeDynamics service is down"
description: "Service has been down for more than 1 minute"
# High memory usage
- alert: HighMemoryUsage
expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is above 90%"

Create grafana-dashboard.json:

{
"dashboard": {
"title": "CreativeDynamics Monitoring",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(creativedynamics_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
],
"type": "graph"
},
{
"title": "Response Time (95th percentile)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
}
],
"type": "graph"
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(creativedynamics_errors_total[5m])",
"legendFormat": "{{error_type}}"
}
],
"type": "graph"
},
{
"title": "Active Analyses",
"targets": [
{
"expr": "creativedynamics_active_analyses"
}
],
"type": "stat"
},
{
"title": "Analysis Duration",
"targets": [
{
"expr": "histogram_quantile(0.5, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))",
"legendFormat": "Median"
},
{
"expr": "histogram_quantile(0.95, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))",
"legendFormat": "95th percentile"
}
],
"type": "graph"
},
{
"title": "Data Processed",
"targets": [
{
"expr": "rate(creativedynamics_data_processed_bytes[5m])",
"legendFormat": "Bytes/sec"
}
],
"type": "graph"
}
]
}
}

Configure structured logging in your application:

import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
log_record['timestamp'] = datetime.utcnow().isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
log_record['module'] = record.module
log_record['function'] = record.funcName
log_record['line'] = record.lineno
# Add correlation ID if available
if hasattr(record, 'correlation_id'):
log_record['correlation_id'] = record.correlation_id
# Add user context if available
if hasattr(record, 'user_id'):
log_record['user_id'] = record.user_id
def setup_logging(level='INFO', log_file=None):
"""Configure structured JSON logging."""
formatter = CustomJsonFormatter()
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# File handler
handlers = [console_handler]
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
handlers.append(file_handler)
# Configure root logger
logging.basicConfig(
level=getattr(logging, level),
handlers=handlers
)
return logging.getLogger(__name__)

Create logstash.conf:

input {
file {
path => "/var/log/creativedynamics/*.log"
start_position => "beginning"
codec => "json"
}
}
filter {
if [level] == "ERROR" {
mutate {
add_tag => [ "error" ]
}
}
if [duration] {
ruby {
code => "
event.set('duration_ms', event.get('duration') * 1000)
"
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "creativedynamics-%{+YYYY.MM.dd}"
}
if "error" in [tags] {
email {
to => "alerts@example.com"
subject => "CreativeDynamics Error Alert"
body => "Error in %{module}.%{function}: %{message}"
}
}
}

Common queries for analysis:

// Find slow analyses
{
"query": {
"bool": {
"must": [
{ "term": { "function": "analyze_all_items" } },
{ "range": { "duration": { "gte": 60 } } }
]
}
}
}
// Error distribution
{
"aggs": {
"errors_by_type": {
"terms": {
"field": "error_type.keyword",
"size": 10
}
}
}
}
// User activity patterns
{
"aggs": {
"requests_over_time": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h"
}
}
}
}
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Configure tracing
def setup_tracing():
# Set up the tracer provider
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure OTLP exporter
otlp_exporter = OTLPSpanExporter(
endpoint="localhost:4317",
insecure=True
)
# Add span processor
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument FastAPI
FastAPIInstrumentor.instrument()
# Instrument requests library
RequestsInstrumentor().instrument()
return tracer
# Use in application
tracer = setup_tracing()
@app.post("/analyze")
async def analyze_endpoint(data: dict):
with tracer.start_as_current_span("analyze_request") as span:
span.set_attribute("data.size", len(str(data)))
span.set_attribute("data.items", len(data.get("items", [])))
# Process analysis
with tracer.start_as_current_span("data_validation"):
validate_data(data)
with tracer.start_as_current_span("signature_calculation"):
signatures = calculate_signatures(data)
with tracer.start_as_current_span("fatigue_detection"):
results = detect_fatigue(signatures)
return results
from typing import Dict, Any
import psutil
import asyncio
from datetime import datetime
class HealthChecker:
def __init__(self):
self.checks = []
def register_check(self, name: str, check_func):
self.checks.append((name, check_func))
async def run_checks(self) -> Dict[str, Any]:
results = {
"timestamp": datetime.utcnow().isoformat(),
"status": "healthy",
"checks": {}
}
for name, check_func in self.checks:
try:
if asyncio.iscoroutinefunction(check_func):
result = await check_func()
else:
result = check_func()
results["checks"][name] = {
"status": "healthy",
"details": result
}
except Exception as e:
results["status"] = "unhealthy"
results["checks"][name] = {
"status": "unhealthy",
"error": str(e)
}
return results
# Health check functions
def check_memory():
memory = psutil.virtual_memory()
return {
"used_percent": memory.percent,
"available_gb": memory.available / (1024**3)
}
def check_disk():
disk = psutil.disk_usage('/')
return {
"used_percent": disk.percent,
"free_gb": disk.free / (1024**3)
}
async def check_api_latency():
import httpx
start = time.time()
async with httpx.AsyncClient() as client:
await client.get("http://localhost:5001/health")
return {"latency_ms": (time.time() - start) * 1000}
# Register checks
health_checker = HealthChecker()
health_checker.register_check("memory", check_memory)
health_checker.register_check("disk", check_disk)
health_checker.register_check("api_latency", check_api_latency)
# Health endpoint
@app.get("/health/detailed")
async def detailed_health():
return await health_checker.run_checks()
from ddtrace import tracer, patch_all
from ddtrace.contrib.asgi import TraceMiddleware
# Patch all supported libraries
patch_all()
# Configure DataDog
tracer.configure(
hostname='localhost',
port=8126,
analytics_enabled=True,
env='production',
service='creativedynamics',
version='0.9.8.1'
)
# Add middleware to FastAPI
app.add_middleware(
TraceMiddleware,
tracer=tracer,
service="creativedynamics-api",
distributed_tracing=True
)
# Custom spans
@tracer.wrap(name='custom.analysis')
def perform_analysis(data):
span = tracer.current_span()
span.set_tag('data.size', len(data))
span.set_metric('items.count', len(data.get('items', [])))
# Analysis logic
return results

Create alertmanager.yml:

global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'creativedynamics@example.com'
smtp_auth_username: 'creativedynamics@example.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'team-notifications'
routes:
- match:
severity: critical
receiver: 'pagerduty-critical'
continue: true
- match:
severity: warning
receiver: 'slack-warnings'
receivers:
- name: 'team-notifications'
email_configs:
- to: 'team@example.com'
headers:
Subject: 'CreativeDynamics Alert: {{ .GroupLabels.alertname }}'
- name: 'pagerduty-critical'
pagerduty_configs:
- service_key: 'YOUR-PAGERDUTY-KEY'
description: '{{ .CommonAnnotations.summary }}'
- name: 'slack-warnings'
slack_configs:
- api_url: 'YOUR-SLACK-WEBHOOK-URL'
channel: '#creativedynamics-alerts'
title: 'CreativeDynamics Warning'
text: '{{ .CommonAnnotations.description }}'

Monitor these essential KPIs:

  • Golden Signals:

    • Latency: Response time distribution
    • Traffic: Requests per second
    • Errors: Error rate and types
    • Saturation: Resource utilization
  • Business Metrics:

    • Analysis completion rate
    • Average processing time
    • Data volume processed
    • API usage patterns
  • Application metrics exposed via /metrics
  • Infrastructure monitoring configured
  • Log aggregation pipeline set up
  • Distributed tracing enabled
  • Alert rules defined and tested
  • Dashboards created for key metrics
  • Runbooks documented for alerts
  • Regular monitoring review scheduled
  1. Detection: Automated alerts trigger
  2. Triage: Assess severity and impact
  3. Diagnosis: Use monitoring tools to identify root cause
  4. Resolution: Apply fix and verify
  5. Post-mortem: Document learnings and improvements
# Metrics to display
service_metrics = {
"uptime": "process_uptime_seconds",
"request_rate": "rate(creativedynamics_requests_total[5m])",
"error_rate": "rate(creativedynamics_errors_total[5m])",
"p95_latency": "histogram_quantile(0.95, creativedynamics_request_duration_seconds_bucket)",
"active_connections": "creativedynamics_active_connections",
"memory_usage": "process_resident_memory_bytes",
"cpu_usage": "rate(process_cpu_seconds_total[5m])"
}
# Analysis-specific metrics
analysis_metrics = {
"analyses_per_hour": "rate(creativedynamics_analyses_total[1h]) * 3600",
"avg_analysis_time": "rate(creativedynamics_analysis_duration_seconds_sum[1h]) / rate(creativedynamics_analysis_duration_seconds_count[1h])",
"signature_calc_time": "histogram_quantile(0.5, creativedynamics_signature_calculation_seconds_bucket)",
"wastage_detection_accuracy": "creativedynamics_wastage_detection_accuracy",
"data_processing_rate": "rate(creativedynamics_data_processed_bytes[5m])"
}
  • Configure Scaling for handling increased load
  • Review Security for monitoring security events
  • Set up Deployment with monitoring integration