Skip to content

Security Best Practices

This guide provides detailed security guidelines for deploying and maintaining CreativeDynamics in production environments. It covers authentication, authorisation, data protection, vulnerability management, and compliance considerations.

CreativeDynamics provides the core analysis and reporting pipeline. Authentication, authorisation, rate limiting, persistence, and security monitoring are typically implemented in a thin deployment application that wraps creativedynamics.api.main:app.

┌─────────────────────────────────────┐
│ External Firewall │
├─────────────────────────────────────┤
│ WAF / DDoS Protection │
├─────────────────────────────────────┤
│ Load Balancer / Nginx │
├─────────────────────────────────────┤
│ API Gateway (Rate Limiting) │
├─────────────────────────────────────┤
│ Application Layer (FastAPI) │
├─────────────────────────────────────┤
│ Data Layer (Encrypted) │
└─────────────────────────────────────┘
your_service/auth/jwt_handler.py
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
import os
# Security configuration
SECRET_KEY = os.getenv("SECRET_KEY", "") # Must be set in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class TokenData(BaseModel):
username: Optional[str] = None
scopes: list = []
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""Create JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> TokenData:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
scopes: list = payload.get("scopes", [])
if username is None:
raise JWTError("Invalid token")
return TokenData(username=username, scopes=scopes)
except JWTError:
raise ValueError("Could not validate credentials")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash password using bcrypt."""
return pwd_context.hash(password)
your_service/auth/oauth2.py
from fastapi import Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from typing import Optional
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"read": "Read access",
"write": "Write access",
"admin": "Admin access"
}
)
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
"""Get current authenticated user."""
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
token_data = verify_token(token)
except ValueError:
raise credentials_exception
# Check scopes
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return token_data
# Protected endpoint example
@app.get("/api/admin/users")
async def get_users(
current_user: TokenData = Security(get_current_user, scopes=["admin"])
):
"""Admin-only endpoint."""
return {"users": get_all_users()}
your_service/auth/api_key.py
from fastapi import HTTPException, Security, status
from fastapi.security import APIKeyHeader, APIKeyQuery
import hashlib
import hmac
import time
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
class APIKeyValidator:
"""Validate API keys with rate limiting."""
def __init__(self):
self.valid_keys = {} # Load from database
self.rate_limits = {}
async def validate_api_key(
self,
api_key_header: str = Security(api_key_header),
api_key_query: str = Security(api_key_query)
) -> str:
"""Validate API key from header or query."""
api_key = api_key_header or api_key_query
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key required"
)
# Validate key format
if not self._is_valid_format(api_key):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key format"
)
# Check if key exists and is active
key_data = await self._get_key_data(api_key)
if not key_data or not key_data.get("active"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or inactive API key"
)
# Check rate limits
if not await self._check_rate_limit(api_key):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded"
)
return api_key
def _is_valid_format(self, api_key: str) -> bool:
"""Check if API key has valid format."""
# Format: prefix_randomstring (e.g., cd_1234567890abcdef)
return api_key.startswith("cd_") and len(api_key) == 19
async def _get_key_data(self, api_key: str) -> dict:
"""Retrieve API key data from database."""
# Hash the API key before database lookup
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Query database for key_hash
return await db.get_api_key(key_hash)
async def _check_rate_limit(self, api_key: str) -> bool:
"""Check if API key has exceeded rate limit."""
current_time = time.time()
window = 60 # 1 minute window
max_requests = 100 # 100 requests per minute
if api_key not in self.rate_limits:
self.rate_limits[api_key] = []
# Remove old requests outside window
self.rate_limits[api_key] = [
t for t in self.rate_limits[api_key]
if current_time - t < window
]
# Check if limit exceeded
if len(self.rate_limits[api_key]) >= max_requests:
return False
# Add current request
self.rate_limits[api_key].append(current_time)
return True
api_key_validator = APIKeyValidator()
your_service/security/validation.py
from pydantic import BaseModel, validator, Field
from typing import List, Optional, Dict, Any
import re
import bleach
class SecureAnalysisRequest(BaseModel):
"""Validated analysis request with security checks."""
dataset_name: str = Field(..., min_length=1, max_length=100)
items: List[Dict[str, Any]] = Field(..., min_items=1, max_items=10000)
config: Optional[Dict[str, Any]] = Field(default_factory=dict)
@validator('dataset_name')
def validate_dataset_name(cls, v):
"""Validate dataset name for security."""
# Remove any HTML/script tags
v = bleach.clean(v, tags=[], strip=True)
# Check for path traversal attempts
if any(char in v for char in ['..', '/', '\\', '\x00']):
raise ValueError("Invalid characters in dataset name")
# Alphanumeric, spaces, hyphens, underscores only
if not re.match(r'^[a-zA-Z0-9\s\-_]+$', v):
raise ValueError("Dataset name contains invalid characters")
return v
@validator('items')
def validate_items(cls, v):
"""Validate items for security and size."""
for item in v:
# Check item size
item_size = len(str(item))
if item_size > 100000: # 100KB per item
raise ValueError("Item size exceeds maximum allowed")
# Validate item structure
if 'id' not in item:
raise ValueError("Item missing required 'id' field")
# Sanitize string fields
for key, value in item.items():
if isinstance(value, str):
item[key] = bleach.clean(value, tags=[], strip=True)
return v
@validator('config')
def validate_config(cls, v):
"""Validate configuration for security."""
# Whitelist allowed configuration keys
allowed_keys = [
'signature_depth', 'window_size', 'threshold',
'min_samples', 'output_format', 'include_plots'
]
# Remove any non-whitelisted keys
v = {k: v[k] for k in v if k in allowed_keys}
# Validate value types and ranges
if 'signature_depth' in v:
if not isinstance(v['signature_depth'], int) or not 1 <= v['signature_depth'] <= 10:
raise ValueError("Invalid signature_depth value")
if 'window_size' in v:
if not isinstance(v['window_size'], int) or not 1 <= v['window_size'] <= 1000:
raise ValueError("Invalid window_size value")
return v
# File upload validation
class SecureFileUpload:
"""Secure file upload handling."""
ALLOWED_EXTENSIONS = {'.csv', '.json', '.xlsx'}
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
@staticmethod
async def validate_file(file):
"""Validate uploaded file for security."""
# Check file extension
filename = file.filename.lower()
extension = os.path.splitext(filename)[1]
if extension not in SecureFileUpload.ALLOWED_EXTENSIONS:
raise ValueError(f"File type {extension} not allowed")
# Check file size
contents = await file.read()
if len(contents) > SecureFileUpload.MAX_FILE_SIZE:
raise ValueError("File size exceeds maximum allowed")
# Reset file pointer
await file.seek(0)
# Validate file content matches extension
if extension == '.csv':
try:
# Try to parse as CSV
pd.read_csv(io.StringIO(contents.decode('utf-8')), nrows=1)
except:
raise ValueError("Invalid CSV file")
elif extension == '.json':
try:
# Try to parse as JSON
json.loads(contents)
except:
raise ValueError("Invalid JSON file")
return True
your_service/security/database.py
from sqlalchemy import text
from sqlalchemy.orm import Session
import re
class SecureDatabase:
"""Secure database operations."""
@staticmethod
def execute_query(db: Session, query: str, params: dict = None):
"""Execute query with parameterized inputs."""
# Never use string formatting for queries
# Always use parameterized queries
# Bad (vulnerable):
# query = f"SELECT * FROM users WHERE id = {user_id}"
# Good (secure):
safe_query = text(query)
result = db.execute(safe_query, params or {})
return result
@staticmethod
def validate_table_name(table_name: str) -> str:
"""Validate table name to prevent injection."""
# Whitelist allowed table names
allowed_tables = ['analyses', 'results', 'users', 'api_keys']
if table_name not in allowed_tables:
raise ValueError(f"Invalid table name: {table_name}")
# Additional validation
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', table_name):
raise ValueError("Invalid table name format")
return table_name
your_service/security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os
class DataEncryption:
"""Encrypt sensitive data at rest."""
def __init__(self):
self.key = self._derive_key()
self.cipher = Fernet(self.key)
def _derive_key(self) -> bytes:
"""Derive encryption key from master key."""
master_key = os.getenv("MASTER_ENCRYPTION_KEY", "").encode()
salt = os.getenv("ENCRYPTION_SALT", "").encode()
if not master_key or not salt:
raise ValueError("Encryption keys not configured")
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(master_key))
return key
def encrypt(self, data: str) -> str:
"""Encrypt string data."""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt string data."""
return self.cipher.decrypt(encrypted_data.encode()).decode()
def encrypt_file(self, file_path: str, output_path: str):
"""Encrypt file contents."""
with open(file_path, 'rb') as f:
data = f.read()
encrypted = self.cipher.encrypt(data)
with open(output_path, 'wb') as f:
f.write(encrypted)
def decrypt_file(self, encrypted_path: str, output_path: str):
"""Decrypt file contents."""
with open(encrypted_path, 'rb') as f:
encrypted = f.read()
decrypted = self.cipher.decrypt(encrypted)
with open(output_path, 'wb') as f:
f.write(decrypted)
# Usage
encryption = DataEncryption()
# Encrypt sensitive configuration
encrypted_config = encryption.encrypt(json.dumps(sensitive_config))
# Decrypt when needed
decrypted_config = json.loads(encryption.decrypt(encrypted_config))
your_service/security/tls.py
import ssl
import certifi
from fastapi import FastAPI
import uvicorn
def create_ssl_context() -> ssl.SSLContext:
"""Create secure SSL context."""
# Create SSL context with secure defaults
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
# Load certificates
context.load_cert_chain(
certfile="/path/to/cert.pem",
keyfile="/path/to/key.pem"
)
# Set minimum TLS version to 1.2
context.minimum_version = ssl.TLSVersion.TLSv1_2
# Disable weak ciphers
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
# Enable hostname checking
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
return context
# Run API with TLS
if __name__ == "__main__":
ssl_context = create_ssl_context()
uvicorn.run(
"creativedynamics.api.main:app",
host="0.0.0.0",
port=443,
ssl_keyfile="/path/to/key.pem",
ssl_certfile="/path/to/cert.pem",
ssl_version=ssl.PROTOCOL_TLS,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ciphers="TLSv1.2:!aNULL:!eNULL:!EXPORT:!DES:!MD5:!PSK:!RC4"
)
your_service/security/headers.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses."""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Security headers
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self' data:; "
"connect-src 'self';"
)
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = (
"accelerometer=(), camera=(), geolocation=(), "
"gyroscope=(), magnetometer=(), microphone=(), "
"payment=(), usb=()"
)
return response
def configure_security(app: FastAPI):
"""Configure all security middleware."""
# Security headers
app.add_middleware(SecurityHeadersMiddleware)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("CORS_ORIGINS", "").split(","),
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
max_age=86400,
)
# Trusted host validation
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=os.getenv("ALLOWED_HOSTS", "localhost").split(",")
)
return app
your_service/security/rate_limiting.py
from fastapi import HTTPException, Request, status
from typing import Optional
import redis
import time
import hashlib
class RateLimiter:
"""Token bucket rate limiting with Redis."""
def __init__(
self,
redis_client: redis.Redis,
requests_per_minute: int = 60,
requests_per_hour: int = 1000
):
self.redis = redis_client
self.rpm = requests_per_minute
self.rph = requests_per_hour
async def check_rate_limit(
self,
request: Request,
identifier: Optional[str] = None
) -> bool:
"""Check if request exceeds rate limit."""
# Get identifier (API key, user ID, or IP)
if not identifier:
identifier = self._get_client_ip(request)
# Create rate limit keys
minute_key = f"rate_limit:minute:{identifier}:{int(time.time() / 60)}"
hour_key = f"rate_limit:hour:{identifier}:{int(time.time() / 3600)}"
# Check minute limit
minute_count = self.redis.incr(minute_key)
if minute_count == 1:
self.redis.expire(minute_key, 60)
if minute_count > self.rpm:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded: {self.rpm} requests per minute",
headers={"Retry-After": "60"}
)
# Check hour limit
hour_count = self.redis.incr(hour_key)
if hour_count == 1:
self.redis.expire(hour_key, 3600)
if hour_count > self.rph:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded: {self.rph} requests per hour",
headers={"Retry-After": "3600"}
)
return True
def _get_client_ip(self, request: Request) -> str:
"""Get client IP address."""
# Check for proxy headers
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
real_ip = request.headers.get("X-Real-IP")
if real_ip:
return real_ip
return request.client.host
# Usage in FastAPI
rate_limiter = RateLimiter(redis_client)
@app.post("/api/v1/analyze/all")
async def analyze(
request: Request,
data: SecureAnalysisRequest,
api_key: str = Depends(api_key_validator.validate_api_key)
):
# Check rate limit
await rate_limiter.check_rate_limit(request, identifier=api_key)
# Process request
return await process_analysis(data)
your_service/security/audit.py
from datetime import datetime
from typing import Dict, Any, Optional
import json
import hashlib
class AuditLogger:
"""Log security-relevant events."""
def __init__(self, log_file: str = "/var/log/creativedynamics/audit.log"):
self.log_file = log_file
async def log_event(
self,
event_type: str,
user_id: Optional[str] = None,
ip_address: Optional[str] = None,
details: Dict[str, Any] = None,
severity: str = "INFO"
):
"""Log security event."""
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"severity": severity,
"user_id": user_id,
"ip_address": ip_address,
"details": details or {},
"event_id": self._generate_event_id()
}
# Log to file
with open(self.log_file, 'a') as f:
f.write(json.dumps(event) + '\n')
# Also send to SIEM if configured
await self._send_to_siem(event)
# Alert on critical events
if severity == "CRITICAL":
await self._send_alert(event)
def _generate_event_id(self) -> str:
"""Generate unique event ID."""
timestamp = str(datetime.utcnow().timestamp())
random_data = os.urandom(16).hex()
return hashlib.sha256(f"{timestamp}{random_data}".encode()).hexdigest()[:16]
async def _send_to_siem(self, event: dict):
"""Send event to SIEM system."""
# Implement SIEM integration
pass
async def _send_alert(self, event: dict):
"""Send critical security alerts."""
# Implement alerting (email, Slack, PagerDuty, etc.)
pass
# Audit log events
audit = AuditLogger()
# Log authentication attempts
await audit.log_event(
event_type="AUTH_ATTEMPT",
user_id=username,
ip_address=client_ip,
details={"success": False, "reason": "Invalid password"},
severity="WARNING"
)
# Log data access
await audit.log_event(
event_type="DATA_ACCESS",
user_id=current_user.id,
details={"dataset": dataset_name, "records": record_count},
severity="INFO"
)
# Log security violations
await audit.log_event(
event_type="SECURITY_VIOLATION",
ip_address=client_ip,
details={"violation": "SQL injection attempt", "query": malicious_query},
severity="CRITICAL"
)
.github/workflows/security.yml
name: Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: '0 0 * * *' # Daily scan
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install safety bandit semgrep
- name: Run Safety check
run: |
safety check --json --output safety-report.json
- name: Run Bandit security linter
run: |
bandit -r creativedynamics/ -f json -o bandit-report.json
- name: Run Semgrep
run: |
semgrep --config=auto --json --output=semgrep-report.json creativedynamics/
- name: Upload security reports
uses: actions/upload-artifact@v3
with:
name: security-reports
path: |
safety-report.json
bandit-report.json
semgrep-report.json
- name: Check for vulnerabilities
run: |
python scripts/check_vulnerabilities.py
tests/test_security.py
import pytest
from fastapi.testclient import TestClient
import time
class TestSecurity:
"""Security-focused tests."""
def test_sql_injection(self, client: TestClient):
"""Test SQL injection prevention."""
malicious_inputs = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"admin'--",
"' UNION SELECT * FROM users--"
]
for payload in malicious_inputs:
response = client.post(
"/api/v1/analyze/all",
json={"dataset_name": payload, "items": []}
)
# Should reject malicious input
assert response.status_code in [400, 422]
assert "DROP TABLE" not in str(response.content)
def test_xss_prevention(self, client: TestClient):
"""Test XSS prevention."""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<iframe src='javascript:alert()'>"
]
for payload in xss_payloads:
response = client.post(
"/api/analyze",
json={"dataset_name": payload, "items": []}
)
# Should sanitize or reject
assert "<script>" not in str(response.content)
assert "javascript:" not in str(response.content)
def test_rate_limiting(self, client: TestClient):
"""Test rate limiting."""
# Make requests up to limit
for i in range(60):
response = client.get("/health")
assert response.status_code == 200
# Next request should be rate limited
response = client.get("/health")
assert response.status_code == 429
assert "Retry-After" in response.headers
def test_authentication_required(self, client: TestClient):
"""Test authentication enforcement."""
protected_endpoints = [
"/api/v1/report/export",
]
for endpoint in protected_endpoints:
response = client.get(endpoint)
assert response.status_code == 401
def test_secure_headers(self, client: TestClient):
"""Test security headers."""
response = client.get("/health")
assert response.headers.get("X-Content-Type-Options") == "nosniff"
assert response.headers.get("X-Frame-Options") == "DENY"
assert response.headers.get("X-XSS-Protection") == "1; mode=block"
assert "Strict-Transport-Security" in response.headers
assert "Content-Security-Policy" in response.headers
your_service/privacy/gdpr.py
from datetime import datetime, timedelta
from typing import Dict, Any
import json
class GDPRCompliance:
"""GDPR compliance utilities."""
async def export_user_data(self, user_id: str) -> Dict[str, Any]:
"""Export all user data (GDPR Article 20)."""
user_data = {
"user_id": user_id,
"export_date": datetime.utcnow().isoformat(),
"personal_data": await self._get_personal_data(user_id),
"usage_data": await self._get_usage_data(user_id),
"analysis_history": await self._get_analysis_history(user_id)
}
# Log data export
await audit.log_event(
event_type="GDPR_DATA_EXPORT",
user_id=user_id,
severity="INFO"
)
return user_data
async def delete_user_data(self, user_id: str) -> bool:
"""Delete all user data (GDPR Article 17)."""
# Delete from all systems
await self._delete_from_database(user_id)
await self._delete_from_cache(user_id)
await self._delete_from_logs(user_id)
# Log deletion
await audit.log_event(
event_type="GDPR_DATA_DELETION",
user_id=user_id,
severity="INFO"
)
return True
async def anonymize_old_data(self, days: int = 365):
"""Anonymize data older than specified days."""
cutoff_date = datetime.utcnow() - timedelta(days=days)
# Anonymize personal data
await db.execute("""
UPDATE analyses
SET user_id = 'ANONYMIZED',
ip_address = 'ANONYMIZED'
WHERE created_at < :cutoff_date
""", {"cutoff_date": cutoff_date})
return True
  • Implement authentication and authorization
  • Add input validation for all endpoints
  • Enable security headers
  • Configure CORS properly
  • Implement rate limiting
  • Add audit logging
  • Set up dependency scanning
  • Write security tests
  • Generate strong secret keys
  • Configure TLS/SSL certificates
  • Set up firewall rules
  • Configure secure database connections
  • Enable encryption at rest
  • Set up intrusion detection
  • Configure backup encryption
  • Implement key rotation
  • Regular security updates
  • Vulnerability scanning
  • Penetration testing
  • Security audit reviews
  • Incident response plan
  • Security training for team
  • Compliance audits
  • Regular backups and recovery testing
  1. Detection: Monitor security events and alerts
  2. Containment: Isolate affected systems
  3. Investigation: Analyze logs and determine scope
  4. Eradication: Remove threat and patch vulnerabilities
  5. Recovery: Restore systems and verify integrity
  6. Lessons Learned: Document and improve processes
your_service/security/incident.py
SECURITY_CONTACTS = {
"security_team": "security@example.com",
"ciso": "ciso@example.com",
"legal": "legal@example.com",
"pr": "pr@example.com",
"on_call": "+1-xxx-xxx-xxxx"
}
# Incident severity levels
SEVERITY_LEVELS = {
"CRITICAL": "Data breach, system compromise",
"HIGH": "Authentication bypass, privilege escalation",
"MEDIUM": "Failed attack attempts, suspicious activity",
"LOW": "Policy violations, minor issues"
}