Layers & Scope¶
Resource management is one of the hardest problems in async programming. effectpy solves this with two key abstractions:
Layer
: Describes how to build and teardown servicesScope
: Guarantees resources are cleaned up in the correct order (LIFO)
Together, they provide guaranteed resource safety - no more leaked connections, forgotten cleanup, or race conditions during shutdown.
The Problem¶
Traditional async Python resource management is error-prone:
# ❌ Easy to forget cleanup, or cleanup in wrong order
async def risky_approach():
db = Database("postgresql://...")
cache = Cache("redis://...")
logger = Logger()
try:
# Do work...
pass
finally:
# What if this fails? What if we forget one?
# What's the right order?
await db.close()
await cache.close()
await logger.close()
Problems with this approach: - Cleanup order matters - close things in reverse of creation - Exception safety - cleanup must happen even if operations fail - Complex dependencies - some resources depend on others - Easy to forget - manual cleanup is error-prone
The effectpy Solution¶
# ✅ Guaranteed cleanup in correct order
import asyncio
from effectpy import *
async def safe_approach():
scope = Scope()
# Build environment with multiple resources
env = await (DatabaseLayer | CacheLayer | LoggerLayer).build_scoped(Context(), scope)
# Use resources safely
db = env.get(Database)
result = await db.query("SELECT 1")
print(f"Result: {result}")
# Guaranteed cleanup in reverse order, even if exceptions occur
await scope.close() # Cache → Database → Logger (LIFO order)
Context: Service Container¶
Context
is a type-safe service container that holds your application's dependencies:
from effectpy import *
class Logger:
def log(self, msg: str):
print(f"[LOG] {msg}")
class Database:
def __init__(self, url: str):
self.url = url
print(f"Connected to {url}")
# Manual context building
ctx = (Context()
.with_service(Logger, Logger())
.with_service(Database, Database("postgresql://localhost")))
# Type-safe retrieval
logger = ctx.get(Logger) # Returns Logger instance
db = ctx.get(Database) # Returns Database instance
logger.log("Hello from context!")
Context Features¶
# Check if service exists
if ctx.has(Logger):
logger = ctx.get(Logger)
# Get with default
config = ctx.get_or_else(Config, Config.default())
# Get optional (returns None if not found)
maybe_cache = ctx.get_optional(Cache)
# Context is immutable - operations return new instances
new_ctx = ctx.with_service(Metrics, MetricsRegistry())
Scope: Resource Lifecycle Manager¶
Scope
manages the lifecycle of resources with guaranteed cleanup:
import asyncio
from effectpy import *
class Resource:
def __init__(self, name: str):
self.name = name
print(f"🔧 Created resource: {name}")
async def close(self):
print(f"🔧 Closed resource: {self.name}")
async def scope_example():
scope = Scope()
# Add resources to scope
r1 = Resource("Database")
r2 = Resource("Cache")
r3 = Resource("Logger")
scope.add_finalizer(r1.close)
scope.add_finalizer(r2.close)
scope.add_finalizer(r3.close)
print("Doing work...")
# Cleanup happens in LIFO order (reverse of addition)
await scope.close() # Logger → Cache → Database
asyncio.run(scope_example())
Output:
🔧 Created resource: Database
🔧 Created resource: Cache
🔧 Created resource: Logger
Doing work...
🔧 Closed resource: Logger
🔧 Closed resource: Cache
🔧 Closed resource: Database
Scope Features¶
# Different ways to add cleanup
scope.add_finalizer(resource.close) # Async function
scope.add_finalizer(lambda: print("cleanup")) # Sync function
scope.add_sync_finalizer(file.close) # Explicitly sync
# Nested scopes
parent_scope = Scope()
child_scope = parent_scope.child()
await child_scope.close() # Child cleaned up first
await parent_scope.close() # Then parent
# Exception safety
try:
# Even if this fails...
raise ValueError("Something went wrong!")
finally:
await scope.close() # Cleanup still happens
Layer: Service Builders¶
Layer
describes how to construct and teardown services. Layers are composable and guarantee proper resource management:
Creating Layers¶
import asyncio
from effectpy import *
class Database:
def __init__(self, url: str):
self.url = url
print(f"🔌 Connected to database: {url}")
async def close(self):
print(f"🔌 Closed database: {self.url}")
# Simple layer using from_resource
DatabaseLayer = from_resource(
service_type=Database,
build=lambda ctx: Database("postgresql://localhost:5432/myapp"),
teardown=lambda db: db.close()
)
# More complex layer with dependencies
class Cache:
def __init__(self, url: str, logger: Logger):
self.url = url
self.logger = logger
logger.log(f"Cache connecting to {url}")
async def close(self):
self.logger.log(f"Cache closing {self.url}")
def CacheLayer(redis_url: str = "redis://localhost") -> Layer:
async def build(ctx: Context) -> Context:
logger = ctx.get(Logger) # Depends on Logger
cache = Cache(redis_url, logger)
return ctx.with_service(Cache, cache)
async def teardown(ctx: Context) -> None:
cache = ctx.get(Cache)
await cache.close()
return Layer(build=build, teardown=teardown)
Layer Composition¶
Layers can be composed in two ways:
Sequential Composition (+
)¶
Sequential composition builds layers one after another. Later layers can depend on earlier ones:
# Sequential: Logger first, then Database (which can use Logger)
sequential_layer = LoggerLayer + DatabaseLayer
Parallel Composition (|
)¶
Parallel composition builds layers concurrently. They cannot depend on each other:
# Parallel: All three built concurrently
parallel_layer = LoggerLayer | MetricsLayer | TracerLayer
# Mixed: Logger first, then others in parallel
mixed_layer = LoggerLayer + (MetricsLayer | TracerLayer)
Using Layers¶
async def layer_example():
scope = Scope()
base = Context()
# Build environment
env = await DatabaseLayer.build_scoped(base, scope)
# Use the service
db = env.get(Database)
result = await db.query("SELECT 1")
print(f"Query result: {result}")
# Guaranteed cleanup
await scope.close()
# Or use the convenience method
async def convenient_layer_example():
async with DatabaseLayer.scoped(Context()) as env:
db = env.get(Database)
result = await db.query("SELECT 1")
print(f"Query result: {result}")
# Automatic cleanup when exiting context manager
Real-World Example¶
Here's a comprehensive example showing layers, scopes, and effects working together:
import asyncio
from effectpy import *
# Service definitions
class Config:
def __init__(self):
self.db_url = "postgresql://localhost:5432/myapp"
self.redis_url = "redis://localhost:6379"
self.log_level = "INFO"
class Logger:
def __init__(self, level: str):
self.level = level
print(f"📝 Logger initialized (level: {level})")
def info(self, msg: str):
print(f"[INFO] {msg}")
def error(self, msg: str):
print(f"[ERROR] {msg}")
async def close(self):
print("📝 Logger closed")
class Database:
def __init__(self, url: str, logger: Logger):
self.url = url
self.logger = logger
logger.info(f"Database connecting to {url}")
async def query(self, sql: str) -> list:
await asyncio.sleep(0.1) # Simulate query
self.logger.info(f"Executed: {sql}")
return [{"id": 1, "name": "Alice"}]
async def close(self):
self.logger.info("Database connection closed")
class Cache:
def __init__(self, url: str, logger: Logger):
self.url = url
self.logger = logger
logger.info(f"Cache connecting to {url}")
self._data = {}
def get(self, key: str) -> str | None:
value = self._data.get(key)
self.logger.info(f"Cache GET {key}: {'HIT' if value else 'MISS'}")
return value
def set(self, key: str, value: str):
self._data[key] = value
self.logger.info(f"Cache SET {key}")
async def close(self):
self.logger.info("Cache connection closed")
# Layer definitions
ConfigLayer = from_resource(
Config,
build=lambda ctx: Config(),
teardown=lambda config: None # No cleanup needed
)
def LoggerLayer() -> Layer:
async def build(ctx: Context) -> Context:
config = ctx.get(Config)
logger = Logger(config.log_level)
return ctx.with_service(Logger, logger)
async def teardown(ctx: Context) -> None:
logger = ctx.get(Logger)
await logger.close()
return Layer(build=build, teardown=teardown)
def DatabaseLayer() -> Layer:
async def build(ctx: Context) -> Context:
config = ctx.get(Config)
logger = ctx.get(Logger)
database = Database(config.db_url, logger)
return ctx.with_service(Database, database)
async def teardown(ctx: Context) -> None:
database = ctx.get(Database)
await database.close()
return Layer(build=build, teardown=teardown)
def CacheLayer() -> Layer:
async def build(ctx: Context) -> Context:
config = ctx.get(Config)
logger = ctx.get(Logger)
cache = Cache(config.redis_url, logger)
return ctx.with_service(Cache, cache)
async def teardown(ctx: Context) -> None:
cache = ctx.get(Cache)
await cache.close()
return Layer(build=build, teardown=teardown)
# Application layer with dependencies
AppLayer = ConfigLayer + LoggerLayer() + (DatabaseLayer() | CacheLayer())
# Business logic using effects
def fetch_user_with_cache(user_id: int) -> Effect[Database | Cache | Logger, str, dict]:
async def impl(ctx: Context):
db = ctx.get(Database)
cache = ctx.get(Cache)
logger = ctx.get(Logger)
# Check cache first
cache_key = f"user:{user_id}"
cached = cache.get(cache_key)
if cached:
return {"id": user_id, "name": cached, "from_cache": True}
# Fetch from database
try:
users = await db.query(f"SELECT * FROM users WHERE id = {user_id}")
if not users:
return {"error": "User not found"}
user = users[0]
cache.set(cache_key, user["name"])
return {**user, "from_cache": False}
except Exception as e:
logger.error(f"Database error: {e}")
return {"error": "Database unavailable"}
return Effect(impl)
async def main():
scope = Scope()
# Build complete application environment
env = await AppLayer.build_scoped(Context(), scope)
# Create instrumented effect
fetch_effect = instrument(
"user.fetch",
fetch_user_with_cache(42),
tags={"operation": "user_fetch", "user_id": 42}
)
# First call (cache miss)
result1 = await fetch_effect._run(env)
print(f"First call: {result1}")
# Second call (cache hit)
result2 = await fetch_effect._run(env)
print(f"Second call: {result2}")
# Guaranteed cleanup in correct order
print("\nCleaning up...")
await scope.close()
if __name__ == "__main__":
asyncio.run(main())
Output:
📝 Logger initialized (level: INFO)
[INFO] Database connecting to postgresql://localhost:5432/myapp
[INFO] Cache connecting to redis://localhost:6379
[INFO] Executed: SELECT * FROM users WHERE id = 42
[INFO] Cache SET user:42
First call: {'id': 1, 'name': 'Alice', 'from_cache': False}
[INFO] Cache GET user:42: HIT
Second call: {'id': 1, 'name': 'Alice', 'from_cache': True}
Cleaning up...
[INFO] Cache connection closed
[INFO] Database connection closed
📝 Logger closed
Built-in Layers¶
effectpy includes several built-in layers for common services:
Observability Layers¶
# Logging
logger_env = await LoggerLayer.build_scoped(Context(), scope)
# Metrics
metrics_env = await MetricsLayer.build_scoped(Context(), scope)
# Tracing
tracer_env = await TracerLayer.build_scoped(Context(), scope)
# All observability services
observability = LoggerLayer | MetricsLayer | TracerLayer
obs_env = await observability.build_scoped(Context(), scope)
Clock Layers¶
# Real clock
real_clock_env = await ClockLayer.build_scoped(Context(), scope)
# Test clock for deterministic testing
test_clock = TestClock()
test_env = await TestClockLayer(test_clock).build_scoped(Context(), scope)
Best Practices¶
1. Always Use Scopes for Resources¶
# ✅ Good: Always use Scope
async def good_resource_management():
scope = Scope()
env = await DatabaseLayer.build_scoped(Context(), scope)
try:
# Use resources...
pass
finally:
await scope.close() # Guaranteed cleanup
# ❌ Avoid: Manual resource management
async def avoid_manual_management():
db = Database("postgresql://...")
try:
# Use database...
pass
finally:
await db.close() # Easy to forget, no ordering guarantees
2. Compose Layers Thoughtfully¶
# Dependencies: Config → Logger → (Database | Cache)
# This ensures Logger is available for both Database and Cache
app_layer = ConfigLayer + LoggerLayer() + (DatabaseLayer() | CacheLayer())
# ❌ Wrong: Database needs Logger but they're built in parallel
wrong_layer = LoggerLayer() | DatabaseLayer() # Race condition!
3. Use Layer Composition¶
# ✅ Good: Compose small, focused layers
auth_layer = ConfigLayer + LoggerLayer() + DatabaseLayer() + AuthServiceLayer()
metrics_layer = MetricsLayer | TracerLayer
app_layer = auth_layer + metrics_layer
# ❌ Avoid: Monolithic layers
giant_layer = EverythingLayer() # Hard to test, debug, and reuse
4. Handle Layer Failures Gracefully¶
async def robust_layer_usage():
scope = Scope()
try:
env = await AppLayer.build_scoped(Context(), scope)
# Use environment...
except Exception as e:
print(f"Failed to build environment: {e}")
# scope.close() still happens in finally
finally:
await scope.close() # Always cleanup
Testing with Layers¶
Layers make testing easy by allowing dependency injection:
# Production layers
ProductionLayer = ConfigLayer + LoggerLayer() + DatabaseLayer()
# Test layers with mocks
class MockDatabase:
async def query(self, sql: str) -> list:
return [{"id": 1, "name": "Test User"}]
async def close(self): pass
TestDatabaseLayer = from_resource(
Database,
build=lambda ctx: MockDatabase(),
teardown=lambda db: db.close()
)
TestLayer = ConfigLayer + LoggerLayer() + TestDatabaseLayer
# Same business logic, different environment
async def test_user_service():
scope = Scope()
test_env = await TestLayer.build_scoped(Context(), scope)
result = await fetch_user_with_cache(1)._run(test_env)
assert result["name"] == "Test User"
await scope.close()
What's Next?¶
- → Effects - Working with effectpy computations
- → Runtime & Fibers - Concurrent execution
- → Services Guide - Dependency injection patterns
- → Context & Scope API - Complete API reference