Skip to content

Runtime & Fibers

The Runtime system is effectpy's foundation for concurrent execution. While you can run effects directly with ._run(context), the Runtime provides advanced features like fiber management, supervision, and structured concurrency.

Key concepts: - Runtime: Manages effect execution and fiber lifecycle - Fiber: A lightweight async task with structured cancellation - Supervision: Automatic restart and error handling for long-running processes

Understanding Fibers

A Fiber is effectpy's unit of concurrent execution - similar to an asyncio Task but with structured semantics and better error handling.

Basic Fiber Operations

import asyncio
from effectpy import *

async def fiber_basics():
    runtime = Runtime()

    def long_running_task() -> Effect[Any, str, str]:
        async def impl(ctx: Context):
            for i in range(5):
                await asyncio.sleep(0.5)
                print(f"Working... {i+1}/5")
            return "Task completed!"
        return Effect(impl)

    # Fork a fiber (starts immediately)
    fiber = runtime.fork(long_running_task())
    print(f"Fiber started: {fiber}")

    # Do other work while fiber runs
    await asyncio.sleep(1.0)
    print("Doing other work...")

    # Wait for fiber completion
    result = await fiber.await_()
    print(f"Result: {result}")

    # Clean up
    await runtime.shutdown()

asyncio.run(fiber_basics())

Fiber Lifecycle

import asyncio
from effectpy import *

async def fiber_lifecycle():
    runtime = Runtime()

    def monitored_task(task_id: int) -> Effect[Any, str, str]:
        async def impl(ctx: Context):
            try:
                for i in range(10):
                    await asyncio.sleep(0.2)
                    print(f"Task {task_id}: step {i+1}/10")
                return f"Task {task_id} completed"
            except asyncio.CancelledError:
                print(f"Task {task_id} was cancelled")
                raise
        return Effect(impl)

    # Start multiple fibers
    fibers = []
    for i in range(3):
        fiber = runtime.fork(monitored_task(i))
        fibers.append(fiber)
        print(f"Started fiber {i}: {fiber}")

    # Let them run briefly
    await asyncio.sleep(1.0)

    # Interrupt one fiber
    print("Interrupting fiber 1...")
    await fibers[1].interrupt()

    # Wait for others to complete
    for i, fiber in enumerate(fibers):
        if i == 1:  # Skip interrupted fiber
            continue
        try:
            result = await fiber.await_()
            print(f"Fiber {i} result: {result}")
        except Exception as e:
            print(f"Fiber {i} failed: {e}")

    await runtime.shutdown()

asyncio.run(fiber_lifecycle())

Runtime Features

Fiber Management

import asyncio
from effectpy import *

async def fiber_management():
    runtime = Runtime()

    def worker_task(worker_id: int, work_time: float) -> Effect[Any, str, str]:
        async def impl(ctx: Context):
            print(f"Worker {worker_id} starting")
            await asyncio.sleep(work_time)
            print(f"Worker {worker_id} finished")
            return f"Worker {worker_id} result"
        return Effect(impl)

    # Fork multiple workers
    workers = []
    for i in range(5):
        fiber = runtime.fork(worker_task(i, 0.5 + i * 0.2))
        workers.append((i, fiber))

    print(f"Started {len(workers)} workers")

    # Monitor and collect results
    results = {}
    for worker_id, fiber in workers:
        try:
            result = await fiber.await_()
            results[worker_id] = result
            print(f"✅ Worker {worker_id}: {result}")
        except Exception as e:
            results[worker_id] = f"ERROR: {e}"
            print(f"❌ Worker {worker_id}: {e}")

    print(f"Collected {len(results)} results")
    await runtime.shutdown()

asyncio.run(fiber_management())

Error Handling and Supervision

import asyncio
from effectpy import *

class TaskSupervisor:
    def __init__(self, runtime: Runtime):
        self.runtime = runtime
        self.restart_count = 0
        self.max_restarts = 3

    def supervise_task(self, task_effect: Effect[Any, str, str], task_name: str):
        """Supervise a task with automatic restart on failure"""

        async def supervised():
            while self.restart_count < self.max_restarts:
                try:
                    print(f"🚀 Starting {task_name} (attempt {self.restart_count + 1})")
                    fiber = self.runtime.fork(task_effect)
                    result = await fiber.await_()
                    print(f"✅ {task_name} completed: {result}")
                    return result

                except Exception as e:
                    self.restart_count += 1
                    print(f"❌ {task_name} failed: {e}")

                    if self.restart_count < self.max_restarts:
                        wait_time = 2 ** self.restart_count  # Exponential backoff
                        print(f"⏳ Restarting {task_name} in {wait_time}s...")
                        await asyncio.sleep(wait_time)
                    else:
                        print(f"💥 {task_name} exceeded max restarts")
                        raise

        return Effect(lambda ctx: supervised())

async def supervision_example():
    runtime = Runtime()
    supervisor = TaskSupervisor(runtime)

    def unreliable_task() -> Effect[Any, str, str]:
        async def impl(ctx: Context):
            import random
            await asyncio.sleep(0.5)

            if random.random() < 0.7:  # 70% failure rate
                raise ValueError("Random task failure!")

            return "Task succeeded!"

        return Effect(impl)

    # Supervise the unreliable task
    try:
        supervised = supervisor.supervise_task(unreliable_task(), "UnreliableWorker")
        result = await supervised._run(Context())
        print(f"Final result: {result}")
    except Exception as e:
        print(f"Task permanently failed: {e}")

    await runtime.shutdown()

asyncio.run(supervision_example())

Structured Concurrency with Runtime

import asyncio
from effectpy import *

async def structured_concurrency():
    runtime = Runtime()

    def data_processor(data_id: int, processing_time: float) -> Effect[Any, str, dict]:
        async def impl(ctx: Context):
            print(f"📊 Processing data {data_id}")
            await asyncio.sleep(processing_time)

            # Simulate occasional failures
            if data_id == 3:  # This one always fails
                raise ValueError(f"Processing failed for data {data_id}")

            return {"data_id": data_id, "result": f"processed_{data_id}"}

        return Effect(impl)

    # Process multiple data items concurrently
    data_items = [(i, 0.5 + i * 0.1) for i in range(1, 6)]

    # Fork all processors
    processors = []
    for data_id, proc_time in data_items:
        effect = data_processor(data_id, proc_time)
        fiber = runtime.fork(effect)
        processors.append((data_id, fiber))

    print(f"🚀 Started {len(processors)} processors")

    # Collect results, handling failures gracefully
    successful_results = []
    failed_items = []

    for data_id, fiber in processors:
        try:
            result = await fiber.await_()
            successful_results.append(result)
            print(f"✅ Data {data_id}: Success")
        except Exception as e:
            failed_items.append((data_id, str(e)))
            print(f"❌ Data {data_id}: Failed - {e}")

    print(f"\n📈 Processing Summary:")
    print(f"  Successful: {len(successful_results)}")
    print(f"  Failed: {len(failed_items)}")

    if failed_items:
        print("  Failed items:")
        for data_id, error in failed_items:
            print(f"    - Data {data_id}: {error}")

    await runtime.shutdown()

asyncio.run(structured_concurrency())

Runtime vs Direct Execution

When to Use Runtime

import asyncio
from effectpy import *

# ✅ Good: Use runtime for long-running processes
async def long_running_service():
    runtime = Runtime()

    def background_monitor() -> Effect[Any, str, None]:
        async def impl(ctx: Context):
            counter = 0
            while True:  # Infinite loop
                await asyncio.sleep(2.0)
                counter += 1
                print(f"🔍 Monitor check #{counter}")

                # Exit condition for demo
                if counter >= 5:
                    break

        return Effect(impl)

    # Fork background process
    monitor_fiber = runtime.fork(background_monitor())

    # Do main application work
    print("🚀 Main application starting...")
    await asyncio.sleep(3.0)
    print("⚙️  Main application working...")
    await asyncio.sleep(4.0)
    print("✅ Main application completed")

    # Clean shutdown
    await monitor_fiber.interrupt()
    await runtime.shutdown()

# ✅ Good: Use direct execution for simple tasks  
async def simple_computation():
    def calculate() -> Effect[Any, str, int]:
        return succeed(42 * 2)

    result = await calculate()._run(Context())
    print(f"Simple calculation: {result}")

asyncio.run(long_running_service())
asyncio.run(simple_computation())

AnyIO Runtime (Optional)

effectpy supports AnyIO for Trio compatibility:

import asyncio
from effectpy import *

async def anyio_example():
    # Only available if anyio is installed
    try:
        if AnyIORuntime is not None:
            runtime = AnyIORuntime()

            def trio_compatible_task() -> Effect[Any, str, str]:
                async def impl(ctx: Context):
                    await asyncio.sleep(0.1)  # Works with both asyncio and trio
                    return "Task completed with AnyIO!"

                return Effect(impl)

            fiber = runtime.fork(trio_compatible_task())
            result = await fiber.await_()
            print(f"AnyIO result: {result}")

            await runtime.shutdown()
        else:
            print("AnyIO runtime not available (install anyio)")

    except Exception as e:
        print(f"AnyIO example failed: {e}")

asyncio.run(anyio_example())

Testing with Runtime

import asyncio
from effectpy import *

async def runtime_testing():
    """Test fiber behavior with controlled timing"""

    test_clock = TestClock()
    runtime = Runtime()

    def timed_task(duration: float, name: str) -> Effect[Clock, str, str]:
        async def impl(ctx: Context):
            clock = ctx.get(Clock)
            start = await clock.current_time()

            await clock.sleep(duration)

            end = await clock.current_time()
            return f"{name} completed in {end - start:.1f}s"

        return Effect(impl)

    # Set up test environment
    scope = Scope()
    env = await TestClockLayer(test_clock).build_scoped(Context(), scope)

    # Fork tasks with different durations
    fiber1 = runtime.fork(timed_task(1.0, "Task1"))
    fiber2 = runtime.fork(timed_task(2.0, "Task2"))

    # Advance time and collect results
    await asyncio.sleep(0.01)  # Let tasks start

    test_clock.advance(1.0)  # Task1 should complete
    result1 = await fiber1.await_()
    print(f"After 1s: {result1}")

    test_clock.advance(1.0)  # Task2 should complete  
    result2 = await fiber2.await_()
    print(f"After 2s: {result2}")

    await runtime.shutdown()
    await scope.close()

asyncio.run(runtime_testing())

Performance Considerations

Fiber Overhead

import asyncio
import time
from effectpy import *

async def performance_comparison():
    """Compare direct execution vs fiber execution"""

    def simple_task(task_id: int) -> Effect[Any, str, int]:
        async def impl(ctx: Context):
            await asyncio.sleep(0.001)  # Minimal work
            return task_id * 2
        return Effect(impl)

    num_tasks = 100

    # Direct execution
    start_time = time.time()
    direct_results = []
    for i in range(num_tasks):
        result = await simple_task(i)._run(Context())
        direct_results.append(result)
    direct_time = time.time() - start_time

    # Runtime with fibers
    runtime = Runtime()
    start_time = time.time()

    fibers = []
    for i in range(num_tasks):
        fiber = runtime.fork(simple_task(i))
        fibers.append(fiber)

    fiber_results = []
    for fiber in fibers:
        result = await fiber.await_()
        fiber_results.append(result)

    fiber_time = time.time() - start_time
    await runtime.shutdown()

    print(f"📊 Performance Comparison ({num_tasks} tasks):")
    print(f"  Direct execution: {direct_time:.3f}s")
    print(f"  Fiber execution:  {fiber_time:.3f}s")
    print(f"  Overhead: {((fiber_time - direct_time) / direct_time * 100):.1f}%")

asyncio.run(performance_comparison())

Best Practices

1. Use Runtime for Concurrent Systems

# ✅ Good: Runtime for managing multiple concurrent processes
async def good_concurrent_system():
    runtime = Runtime()

    # Multiple long-running services
    services = [
        ("WebServer", web_server_effect()),
        ("DatabaseCleanup", db_cleanup_effect()),
        ("MetricsCollector", metrics_effect())
    ]

    fibers = []
    for name, service in services:
        fiber = runtime.fork(service)
        fibers.append((name, fiber))

    # Handle shutdown gracefully
    # ... application logic ...

    for name, fiber in fibers:
        await fiber.interrupt()

    await runtime.shutdown()

# ❌ Avoid: Runtime for simple sequential operations
async def avoid_runtime_for_simple():
    runtime = Runtime()  # Unnecessary overhead

    result = await runtime.fork(succeed(42))  # Just use ._run(Context())
    await runtime.shutdown()

2. Always Clean Up Runtime

# ✅ Good: Proper cleanup
async def proper_cleanup():
    runtime = Runtime()

    try:
        # Your application logic
        fiber = runtime.fork(my_effect())
        result = await fiber.await_()
        return result
    finally:
        await runtime.shutdown()  # Always cleanup

# ✅ Better: Use context manager if available
async def context_manager_cleanup():
    async with runtime_context() as runtime:
        fiber = runtime.fork(my_effect())
        return await fiber.await_()

3. Handle Fiber Failures Appropriately

# ✅ Good: Handle fiber failures explicitly
async def handle_failures():
    runtime = Runtime()

    critical_fiber = runtime.fork(critical_service())
    optional_fiber = runtime.fork(optional_service())

    try:
        # Critical service must succeed
        await critical_fiber.await_()

        # Optional service can fail
        try:
            await optional_fiber.await_()
        except Exception as e:
            print(f"Optional service failed: {e}")

    finally:
        await runtime.shutdown()

4. Use Supervision for Resilience

# ✅ Good: Supervise important long-running processes
class ServiceSupervisor:
    def __init__(self, runtime: Runtime):
        self.runtime = runtime

    def supervise(self, service_effect: Effect[Any, str, Any], name: str):
        def supervised():
            # Restart logic, health checks, etc.
            pass

        return self.runtime.fork(Effect(supervised))

What's Next?