Observability¶
Observability is built into effectpy from the ground up. Unlike many async libraries where monitoring is an afterthought, effectpy makes it trivial to add comprehensive logging, metrics, and tracing to any effect.
The instrument()
function automatically captures:
- Structured logs with correlation IDs and context
- Metrics for performance monitoring
- Traces for distributed request tracking
The Three Pillars¶
1. Logging¶
effectpy provides structured logging with automatic correlation IDs:
import asyncio
from effectpy import *
async def user_service_example():
scope = Scope()
env = await LoggerLayer.build_scoped(Context(), scope)
logger = env.get(ConsoleLogger)
# Manual logging
logger.info("User service starting", tags={"component": "user_service"})
# Automatic logging via instrumentation
def fetch_user(user_id: int) -> Effect[Any, str, dict]:
return succeed({"id": user_id, "name": f"User {user_id}"})
instrumented = instrument(
"user.fetch",
fetch_user(123),
tags={"user_id": 123, "operation": "fetch"}
)
result = await instrumented._run(env)
print(f"Result: {result}")
await scope.close()
asyncio.run(user_service_example())
Log Formats¶
Choose between plain text and structured JSON:
# Plain text logging (default)
# [2024-01-15 10:30:45] INFO [correlation_id=abc123] user.fetch started {user_id=123}
# JSON logging - better for log aggregation
logger = ConsoleLogger(json_output=True)
# {"timestamp": "2024-01-15T10:30:45Z", "level": "INFO", "message": "user.fetch started", "correlation_id": "abc123", "tags": {"user_id": 123}}
Correlation IDs¶
effectpy automatically tracks correlation IDs across async operations:
async def correlation_example():
scope = Scope()
env = await LoggerLayer.build_scoped(Context(), scope)
async def service_a() -> Effect[ConsoleLogger, str, str]:
async def impl(ctx: Context):
logger = ctx.get(ConsoleLogger)
logger.info("Service A starting")
# Call another service
result = await service_b()._run(ctx)
logger.info("Service A completed", tags={"result": result})
return f"A -> {result}"
return Effect(impl)
async def service_b() -> Effect[ConsoleLogger, str, str]:
async def impl(ctx: Context):
logger = ctx.get(ConsoleLogger)
logger.info("Service B processing") # Same correlation ID
return "B result"
return Effect(impl)
# All logs will share the same correlation ID
result = await service_a()._run(env)
print(f"Final result: {result}")
await scope.close()
asyncio.run(correlation_example())
2. Metrics¶
effectpy includes a complete metrics system with counters, gauges, and histograms:
import asyncio
from effectpy import *
async def metrics_example():
scope = Scope()
env = await (LoggerLayer | MetricsLayer).build_scoped(Context(), scope)
metrics = env.get(MetricsRegistry)
# Create metrics
request_counter = metrics.counter("http_requests_total", "Total HTTP requests")
response_time = metrics.histogram("http_response_time_seconds", "HTTP response time")
active_connections = metrics.gauge("http_active_connections", "Active HTTP connections")
# Manual metrics
request_counter.inc(labels={"method": "GET", "endpoint": "/users"})
active_connections.set(42, labels={"server": "web-01"})
with response_time.time(labels={"method": "GET"}):
await asyncio.sleep(0.1) # Simulate request processing
# Automatic metrics via instrumentation
def api_call() -> Effect[Any, str, dict]:
return succeed({"status": "success", "data": "API response"})
instrumented = instrument(
"api.call",
api_call(),
tags={"method": "GET", "endpoint": "/api/data"}
)
result = await instrumented._run(env)
# Print collected metrics
print("Collected metrics:")
for metric in metrics.collect():
print(f" {metric.name}: {metric.value} {metric.labels}")
await scope.close()
asyncio.run(metrics_example())
3. Tracing¶
Distributed tracing helps you understand request flows across services:
import asyncio
from effectpy import *
async def tracing_example():
scope = Scope()
env = await (LoggerLayer | TracerLayer).build_scoped(Context(), scope)
tracer = env.get(Tracer)
# Manual span creation
with tracer.span("database.query", attributes={"table": "users", "query": "SELECT *"}) as span:
await asyncio.sleep(0.05) # Simulate DB query
span.add_event("query_executed", {"rows_returned": 10})
# Automatic tracing via instrumentation
def complex_operation() -> Effect[Any, str, str]:
async def impl(ctx: Context):
# Nested operation - creates child span
await asyncio.sleep(0.02)
nested = instrument(
"complex.nested",
succeed("nested result"),
tags={"step": "processing"}
)
nested_result = await nested._run(ctx)
return f"Complex: {nested_result}"
return Effect(impl)
instrumented = instrument(
"complex.operation",
complex_operation(),
tags={"request_id": "req-123", "user_id": 456}
)
result = await instrumented._run(env)
print(f"Result: {result}")
# Export traces
spans = tracer.get_spans()
for span in spans:
print(f"Span: {span.name} ({span.duration:.3f}s)")
for event in span.events:
print(f" Event: {event.name} - {event.attributes}")
await scope.close()
asyncio.run(tracing_example())
The instrument() Function¶
The instrument()
function is the primary way to add observability to effects:
from effectpy import *
# Basic instrumentation
instrumented_effect = instrument("operation.name", my_effect)
# With tags for context
instrumented_effect = instrument(
"user.fetch",
fetch_user_effect,
tags={
"user_id": 123,
"source": "database",
"cache": "enabled"
}
)
# Tags can be dynamic
def fetch_with_dynamic_tags(user_id: int) -> Effect[Any, str, dict]:
base_effect = fetch_user(user_id)
return instrument(
"user.fetch",
base_effect,
tags={
"user_id": user_id,
"timestamp": int(time.time()),
"environment": "production"
}
)
What instrument() Captures¶
For every instrumented effect, effectpy automatically records:
- Start/End Events: When the operation begins and completes
- Duration: How long the operation took
- Success/Failure: Whether the operation succeeded or failed
- Error Details: Full error information if it failed
- Tags: All provided metadata
- Correlation ID: Automatic request tracking
OpenTelemetry Integration¶
effectpy supports OpenTelemetry Protocol (OTLP) for integration with observability platforms:
import asyncio
from effectpy import *
async def otlp_export_example():
scope = Scope()
env = await (LoggerLayer | MetricsLayer | TracerLayer).build_scoped(Context(), scope)
# Your instrumented application code
def api_workflow() -> Effect[Any, str, dict]:
auth_check = instrument("auth.check", succeed({"user": "alice"}))
data_fetch = instrument(
"data.fetch",
succeed({"records": [1, 2, 3]}),
tags={"table": "users", "limit": 100}
)
return (
auth_check
.flat_map(lambda auth: data_fetch)
.map(lambda data: {"auth": auth, "data": data})
)
# Run workflow
result = await instrument("api.workflow", api_workflow())._run(env)
# Export to OpenTelemetry collectors
tracer = env.get(Tracer)
metrics = env.get(MetricsRegistry)
# Export spans (requires aiohttp)
await export_spans_otlp_http(
spans=tracer.get_spans(),
endpoint="http://jaeger:14268/api/traces"
)
# Export metrics (requires aiohttp)
await export_metrics_otlp_http(
metrics=metrics.collect(),
endpoint="http://prometheus:9090/api/v1/otlp/metrics"
)
print(f"Workflow completed: {result}")
await scope.close()
# Only runs if aiohttp is available
try:
asyncio.run(otlp_export_example())
except ImportError:
print("aiohttp not available - OTLP export skipped")
Real-World Patterns¶
API Request Tracing¶
import asyncio
from effectpy import *
class HTTPClient:
async def get(self, url: str) -> dict:
await asyncio.sleep(0.1) # Simulate network call
return {"status": 200, "data": f"Response from {url}"}
async def api_request_tracing():
scope = Scope()
env = await (LoggerLayer | MetricsLayer | TracerLayer).build_scoped(Context(), scope)
# Add HTTP client to environment
http_client = HTTPClient()
env = env.with_service(HTTPClient, http_client)
def make_request(url: str) -> Effect[HTTPClient, str, dict]:
async def impl(ctx: Context):
client = ctx.get(HTTPClient)
response = await client.get(url)
if response["status"] >= 400:
raise ValueError(f"HTTP {response['status']}")
return response
return Effect(impl)
# Instrument the request with rich context
instrumented_request = instrument(
"http.request",
make_request("https://api.example.com/users"),
tags={
"method": "GET",
"url": "https://api.example.com/users",
"service": "user-api",
"version": "1.2.3"
}
)
result = await instrumented_request._run(env)
print(f"API Response: {result}")
await scope.close()
asyncio.run(api_request_tracing())
Database Query Monitoring¶
async def database_monitoring():
scope = Scope()
env = await (LoggerLayer | MetricsLayer | TracerLayer).build_scoped(Context(), scope)
class Database:
async def query(self, sql: str) -> list:
# Simulate query execution time based on complexity
if "JOIN" in sql.upper():
await asyncio.sleep(0.2) # Complex query
else:
await asyncio.sleep(0.05) # Simple query
return [{"id": 1, "name": "Alice"}]
db = Database()
env = env.with_service(Database, db)
def execute_query(sql: str) -> Effect[Database, str, list]:
async def impl(ctx: Context):
database = ctx.get(Database)
return await database.query(sql)
return Effect(impl)
# Different query types with different tags
queries = [
("SELECT * FROM users", "simple"),
("SELECT u.*, p.* FROM users u JOIN profiles p ON u.id = p.user_id", "complex"),
("SELECT COUNT(*) FROM users", "aggregate")
]
for sql, query_type in queries:
instrumented_query = instrument(
"database.query",
execute_query(sql),
tags={
"query_type": query_type,
"table": "users",
"operation": sql.split()[0].upper() # SELECT, INSERT, etc.
}
)
result = await instrumented_query._run(env)
print(f"{query_type.capitalize()} query returned {len(result)} rows")
# Print performance metrics
metrics = env.get(MetricsRegistry)
print("\nDatabase metrics:")
for metric in metrics.collect():
if "database" in metric.name:
print(f" {metric.name}: {metric.value}")
await scope.close()
asyncio.run(database_monitoring())
Error Tracking¶
async def error_tracking_example():
scope = Scope()
env = await (LoggerLayer | MetricsLayer | TracerLayer).build_scoped(Context(), scope)
def unreliable_service(failure_rate: float) -> Effect[Any, str, str]:
import random
if random.random() < failure_rate:
return fail(f"Service failed (rate: {failure_rate})")
return succeed("Service success")
# Test different failure scenarios
scenarios = [
("reliable", 0.1), # 10% failure
("unreliable", 0.7), # 70% failure
("broken", 1.0) # 100% failure
]
for name, failure_rate in scenarios:
for attempt in range(5):
instrumented = instrument(
"service.call",
unreliable_service(failure_rate),
tags={
"service": name,
"attempt": attempt + 1,
"failure_rate": failure_rate
}
)
try:
result = await instrumented._run(env)
print(f"{name} attempt {attempt + 1}: SUCCESS")
except Failure as f:
print(f"{name} attempt {attempt + 1}: FAILED - {f.error}")
# Analyze error patterns
metrics = env.get(MetricsRegistry)
tracer = env.get(Tracer)
print(f"\nCaptured {len(tracer.get_spans())} spans")
# Group spans by outcome
successes = [s for s in tracer.get_spans() if s.status == "ok"]
failures = [s for s in tracer.get_spans() if s.status == "error"]
print(f"Successes: {len(successes)}")
print(f"Failures: {len(failures)}")
if failures:
print("Failure breakdown:")
failure_services = {}
for span in failures:
service = span.tags.get("service", "unknown")
failure_services[service] = failure_services.get(service, 0) + 1
for service, count in failure_services.items():
print(f" {service}: {count} failures")
await scope.close()
asyncio.run(error_tracking_example())
Best Practices¶
1. Use Meaningful Names¶
# ✅ Good: Clear, hierarchical names
instrument("user.profile.fetch", fetch_profile_effect)
instrument("payment.stripe.charge", charge_payment_effect)
instrument("cache.redis.get", get_from_cache_effect)
# ❌ Avoid: Vague or inconsistent names
instrument("operation", some_effect)
instrument("getUserProfile", fetch_profile_effect) # inconsistent naming
2. Add Rich Context with Tags¶
# ✅ Good: Rich, searchable tags
instrument(
"database.query",
query_effect,
tags={
"table": "users",
"operation": "SELECT",
"user_id": 123,
"query_type": "user_lookup",
"cache_enabled": True
}
)
# ❌ Avoid: Missing context
instrument("db", query_effect) # No useful metadata
3. Instrument at the Right Level¶
# ✅ Good: Instrument meaningful business operations
def process_user_registration(user_data: dict) -> Effect[Any, str, dict]:
validate = instrument("user.validate", validate_user_data(user_data))
create = instrument("user.create", create_user_in_db(user_data))
notify = instrument("user.notify", send_welcome_email(user_data))
return validate.flat_map(lambda _: create).flat_map(lambda user: notify.map(lambda _: user))
# ❌ Avoid: Over-instrumenting trivial operations
instrument("string.format", succeed(f"Hello {name}")) # Too granular
4. Handle Observability Layer Failures¶
async def robust_observability():
scope = Scope()
try:
# Build observability environment
env = await (LoggerLayer | MetricsLayer | TracerLayer).build_scoped(Context(), scope)
# Your application code
result = await instrument("app.main", main_workflow())._run(env)
except Exception as e:
# Observability setup failed - run without it
print(f"Observability unavailable: {e}")
result = await main_workflow()._run(Context())
finally:
await scope.close()
return result
Integration Examples¶
Prometheus Metrics¶
# Export effectpy metrics to Prometheus format
async def prometheus_integration():
scope = Scope()
env = await MetricsLayer.build_scoped(Context(), scope)
# Run your instrumented application
await my_instrumented_app()._run(env)
# Export metrics in Prometheus format
metrics = env.get(MetricsRegistry)
prometheus_output = []
for metric in metrics.collect():
labels = ",".join(f'{k}="{v}"' for k, v in metric.labels.items())
prometheus_output.append(f"{metric.name}{{{labels}}} {metric.value}")
print("# Prometheus Metrics")
for line in prometheus_output:
print(line)
await scope.close()
Jaeger Tracing¶
async def jaeger_integration():
scope = Scope()
env = await TracerLayer.build_scoped(Context(), scope)
# Run traced application
await my_traced_app()._run(env)
# Export to Jaeger (if aiohttp available)
tracer = env.get(Tracer)
spans = tracer.get_spans()
try:
await export_spans_otlp_http(
spans,
endpoint="http://jaeger:14268/api/traces"
)
print(f"Exported {len(spans)} spans to Jaeger")
except ImportError:
print("aiohttp not available - Jaeger export skipped")
except Exception as e:
print(f"Failed to export spans: {e}")
await scope.close()
What's Next?¶
- → Effects - Learn how to instrument your effects
- → Layers & Scope - Set up observability environments
- → Concurrency Guide - Monitoring concurrent operations
- → Observability API Reference - Complete API documentation