Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Welcome to Typed Effects in Rust.

If you already use async Rust, you know the model: Futures are polled by an executor; work runs when those futures are driven (for example with .await). That foundation is sound, and this book does not ask you to unlearn it.

What teams often hit next is organization at scale: error types that grow without structure, dependencies threaded through long call chains, and background work whose lifetime is hard to reason about. Those problems are not unique to Rust, but they show up in every non-trivial async codebase.

effectful is a library for writing async programs where the shape of the work—success type, error type, and required environment—is carried in one place, and where much of the program is built as composable descriptions (Effect<A, E, R>) that you run only when you choose how and with which dependencies.

You still run on ordinary async runtimes. You still use .await inside bridges to third-party code. What changes is how you structure domain logic, tests, and dependency boundaries.

Who This Book Is For

You should know Rust basics: ownership, borrowing, traits, and how async/await and Future fit together. You do not need prior experience with category theory or functional programming jargon—we introduce terms only when they help.

If you want a typed, compositional style for async Rust—with explicit requirements in the type system and a clear split between “what to run” and “how to run it”—this book is for you.

How to Read This Book

Part I: Foundations explains why effects are useful and teaches the core types. Start here.

Part II: Environment & Dependencies covers the R parameter and compile-time dependency injection patterns.

Part III: Real Programs covers error handling, concurrency, resources, and scheduling for production code.

Part IV: Advanced covers STM, streams, schemas, and testing—read when you need those topics.

Code examples are intended to compile unless marked otherwise.

Let’s begin.

Why Effects?

Before we write effect code in detail, it helps to agree on what problem we are solving and where effectful sits relative to ordinary async Rust.

Rust’s async model is built on Future and executors: futures are lazy until polled, and .await is how async functions compose. That model is not a mistake—it is the standard way to express non-blocking I/O and concurrency.

At application scale, the difficulties are usually engineering ones:

  • Errors — mapping and aggregating failures across layers without losing structure.
  • Dependencies — passing clients, configuration, and context without turning every function signature into a long parameter list (or hiding the same behind globals).
  • Concurrency — knowing who owns a task, how it shuts down, and what happens on cancellation.

This chapter names those patterns, relates them to how Effect<A, E, R> is designed, and sets up the rest of Part I.

By the end of the chapter you should understand:

  • Why teams reach for a declarative layer on top of hand-written async fn chains.
  • What “effect” means in this book: a description of work, separate from running it with a chosen environment.
  • Why the type has three parameters (A, E, R) and why that matters for APIs and tests.

We start with a concrete look at those recurring challenges.

Challenges in Large Async Codebases

Async Rust gives you non-blocking I/O and structured concurrency primitives. In production, the same strengths can become painful when composition and boundaries are not planned: errors, dependencies, and spawned work all tend to accumulate complexity.

This section is not a claim that “async is broken.” It is a concise picture of problems effectful is meant to help with—so the rest of the book has a shared vocabulary.

Challenge 1: Error mapping and noise

A typical async workflow chains several operations. Each step may fail in its own way, so you map errors into a domain type and propagate:

async fn process_order(order: Order) -> Result<Receipt, ProcessError> {
    let config = get_config()
        .await
        .map_err(|e| ProcessError::Config(e))?;

    let user = fetch_user(&config, order.user_id)
        .await
        .map_err(|e| ProcessError::User(e))?;

    let inventory = check_inventory(&config, &order.items)
        .await
        .map_err(|e| ProcessError::Inventory(e))?;

    let payment = charge_payment(&config, &user, order.total)
        .await
        .map_err(|e| ProcessError::Payment(e))?;

    let shipment = create_shipment(&config, &order, &user)
        .await
        .map_err(|e| ProcessError::Shipment(e))?;

    Ok(Receipt::new(order, payment, shipment))
}

The business steps are clear, but the .map_err noise is repetitive. The domain ProcessError enum often grows with every new integration. Policy (retries, fallbacks) may live in callers or ad hoc helpers, which makes behavior harder to see in one place.

What effects add: failure and recovery can be expressed as transformations on a description (for example retry, map_error, structured Exit types), so policies are easier to reuse and test without rewriting the core flow.

Challenge 2: Explicit dependency parameters

Another common shape is the handler that needs many clients and cross-cutting services:

async fn handle_request(
    db: &DatabasePool,
    cache: &RedisClient,
    logger: &Logger,
    config: &AppConfig,
    metrics: &MetricsClient,
    tracer: &Tracer,
    request: Request,
) -> Response {
    // ...
}

Dependencies are explicit, which is good for honesty, but every layer between here and main must repeat or forward them. Tests must build or mock the same bundle repeatedly. Alternatives (globals, implicit context) trade one problem for another.

What effects add: required capabilities can be expressed in R (the environment type) and satisfied in one place at the edge, while inner functions stay focused on logic.

Challenge 3: Background work and lifetimes

Fire-and-forget background tasks are easy to start and harder to reason about:

fn start_background_worker(db: DatabasePool) {
    tokio::spawn(async move {
        loop {
            match process_queue(&db).await {
                Ok(_) => {}
                Err(e) => eprintln!("Worker error: {}", e),
            }
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    });
}

Questions that matter in production—shutdown, cancellation, panic behavior, and resource cleanup—need explicit design. That is true in any async system; the goal is to make ownership and intent visible in the program structure.

What effects add: structured concurrency patterns (fibers, scopes, handles) integrate with the same Effect abstraction so “what runs” and “how it ends” can be expressed consistently.

How this relates to Future and async

Remember: async fn bodies compile to Futures; nothing runs until a future is polled (for example via .await on an async caller). The difficulties above are usually about how we organize async code—signatures, error types, and where side effects are allowed—not about rejecting the Future model.

In practice, hand-written async often reads like a straight-line script: await one step, then the next. That is appropriate for many functions. It becomes harder when you want the same logical workflow to be inspected, wrapped (retries, timeouts), or tested with a substituted environment without threading mocks through every layer.

Effects push the “script” into a value: Effect<A, E, R> is a description that you run with run_async, run_blocking, or test harnesses—after you have composed and configured it.

That does not replace understanding executors or Future. It adds a layer for domain structure: answer type A, error type E, requirements R, and explicit execution.

Next we define what an Effect is in this library and how that description differs from calling async fn directly—without exaggerating either side.

What Even Is an Effect?

An Effect is a description of a computation, not the computation itself.

The rest of the API—map, flat_map, environment types, runners—is there to work with that description in a type-safe way.

The Recipe Analogy

Think about a recipe for chocolate cake.

A recipe is not a cake. You can hold a recipe in your hands without any flour appearing. You can read a recipe without preheating an oven. You can photocopy a recipe, modify it (less sugar, more cocoa), combine it with a frosting recipe, and share it with a friend — all without a single cake coming into existence.

The cake only appears when someone executes the recipe. Takes out the ingredients, follows the steps, waits for the oven.

An Effect is a recipe for a computation.

When you write succeed(42), you’re not “succeeding” at anything. You’re writing down a recipe that says “when executed, produce the value 42.” The 42 doesn’t exist yet. No computation has happened. You just have a piece of paper with instructions on it.

use effectful::{Effect, succeed};

// This doesn't compute anything — it's a description
let recipe: Effect<i32, String, ()> = succeed(42);

// Still nothing has happened. `recipe` is just a value.
// We can pass it around, store it, inspect its type.

The computation only happens when you explicitly run it:

use effectful::run_blocking;

// NOW something happens
let result: Result<i32, String> = run_blocking(recipe, ());
assert_eq!(result, Ok(42));

Building Up Descriptions

Because an Effect is just data — a description — you can transform it without running it.

let recipe: Effect<i32, String, ()> = succeed(42);

// Transform the description: "when executed, produce 42, then double it"
let doubled: Effect<i32, String, ()> = recipe.map(|x| x * 2);

// Still nothing has happened! `doubled` is just a modified recipe.

// Now run it
let result = run_blocking(doubled, ());
assert_eq!(result, Ok(84));

The .map() call didn’t execute anything. It took one recipe and produced a new recipe that includes an extra step. Like writing “double the result” at the bottom of your cake recipe — the cake doesn’t change until someone bakes it.

A More Realistic Example

Let’s see what this looks like with actual I/O:

use effectful::{Effect, effect, run_blocking};

// This function doesn't fetch anything — it returns a DESCRIPTION
// of how to fetch a user
fn fetch_user(id: u64) -> Effect<User, DbError, ()> {
    effect! {
        let conn = bind* connect_to_db();
        let user = bind* query_user(&conn, id);
        user
    }
}

// Calling the function doesn't open any connections
let description = fetch_user(42);

// `description` is a value we can hold, pass around, combine with others
// No database has been touched

// Only when we run it does the I/O happen
let user = run_blocking(description, ())?;

That effect! block looks imperative — it looks like it’s doing things. But it’s not. It’s building a description of things to do. The bind* operator means “this step depends on the previous step completing” — it’s describing sequencing, not executing it.

The Key Insight: Separation of Concerns

This separation — description vs execution — is how the challenges from the previous section (errors, dependencies, task structure) get a consistent home in the type system.

Error handling becomes part of the description itself. When you write:

let resilient = retry(
    || risky_operation(),
    Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);

You’re not adding retry logic to running code. You’re modifying the description to say “when executed, retry up to 3 times with exponential backoff.” The retry logic is baked into the recipe.

Dependencies become part of the type signature. When you write:

fn get_user(id: u64) -> Effect<User, DbError, Database>

That Database in the type says “this recipe requires a Database to execute.” The compiler enforces it. You can’t run the effect without providing a Database. No runtime surprises.

Structured concurrency becomes possible because the runtime knows what each effect intends to do before it does it. Spawning an effect doesn’t fire and forget — it creates a handle to a structured task with clear ownership and cancellation semantics.

What’s in an Effect?

An Effect<A, E, R> carries three pieces of information in its type:

  • A — the Answer: what you get if it succeeds
  • E — the Error: what you get if it fails
  • R — the Requirements: what environment is needed to run it

We’ll explore all three in the next section. For now, just notice that an Effect’s type tells you everything about what it does — success, failure, and dependencies — without you having to read the implementation.

// This type signature tells the whole story:
fn process_payment(
    amount: Money
) -> Effect<Receipt, PaymentError, (PaymentGateway, Logger)>

// - Produces a Receipt on success
// - Can fail with PaymentError
// - Requires a PaymentGateway and Logger to run

No need to read the function body to know what resources it needs or what errors it can produce. The type is the documentation.

Style: imperative async vs effect descriptions

Typical async fn code is written as a sequence of steps: each .await drives the next piece of work. That is clear and idiomatic Rust.

Effect code in this library is often written so that many domain functions return Effect<…>: a value that describes work and only runs when you pass it to a runner with an environment. The style emphasizes composition (map, flat_map, layers, retries) before execution.

Both approaches run on the same Future machinery underneath. Use effects where you want environment and error structure in the type, shared policies, and test substitution at the boundary; use plain async fn where a small linear function is enough.

Let’s look at those three type parameters in detail.

The Three Type Parameters

Every Effect carries three type parameters: Effect<A, E, R>. These aren’t arbitrary — they answer the three fundamental questions every computation must address:

  • A — What do I produce when I succeed?
  • E — What do I produce when I fail?
  • R — What do I need in order to run?

Let’s examine each one.

A: The Answer

The A parameter is the success type — what you get back when everything goes right.

use effectful::{Effect, succeed};

// This effect produces an i32 on success
let answer: Effect<i32, String, ()> = succeed(42);

// This effect produces a User on success
let user_effect: Effect<User, DbError, ()> = succeed(User::new("Alice"));

If you’re familiar with Result<T, E>, think of A as the T. It’s what you’re hoping to get.

When you transform an effect with .map(), you’re changing the A:

let numbers: Effect<i32, String, ()> = succeed(21);
let doubled: Effect<i32, String, ()> = numbers.map(|n| n * 2);
let stringified: Effect<String, String, ()> = doubled.map(|n| n.to_string());

Each .map() transforms the success value while preserving the error type and requirements.

E: The Error

The E parameter is the failure type — what you get back when something goes wrong.

use effectful::{Effect, fail};

// This effect always fails with a String error
let failure: Effect<i32, String, ()> = fail("something went wrong".to_string());

// This effect can fail with a DbError
let user: Effect<User, DbError, ()> = fetch_user_from_db(42);

Again, if you know Result<T, E>, think of E as the E. It’s what you’re worried might happen.

You can transform error types with .map_error():

let db_effect: Effect<User, DbError, ()> = fetch_user(42);

// Convert DbError to a more general AppError
let app_effect: Effect<User, AppError, ()> = db_effect.map_error(|e| AppError::Database(e));

Unlike traditional error handling where you sprinkle .map_err() everywhere, with effects you typically handle error transformation at specific boundaries — when composing larger effects from smaller ones, or when exposing an API.

R: The Requirements

Here’s where effects get interesting. The R parameter represents the environment — the dependencies this effect needs in order to run.

// This effect needs nothing to run — R is ()
let standalone: Effect<i32, String, ()> = succeed(42);

// This effect needs a Database to run
fn get_user(id: u64) -> Effect<User, DbError, Database> {
    // ... implementation that uses the database
}

// This effect needs both a Database AND a Logger
fn get_user_logged(id: u64) -> Effect<User, DbError, (Database, Logger)> {
    // ... implementation that uses both
}

The key insight: you cannot run an effect unless you provide its requirements.

let needs_db: Effect<User, DbError, Database> = get_user(42);

// This won't compile! We haven't satisfied the Database requirement.
// run_blocking(needs_db);  // ERROR: Database not provided

// We pass the required environment to the runner
let user = run_blocking(needs_db, my_database)?;

You can also capture an environment with .provide_env(my_database) to get an Effect<User, DbError, ()>.

Why R Matters

The R parameter is why effectful can offer compile-time dependency injection.

Consider this function signature:

fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger)>

Just from the type, you know:

  • This produces a Receipt on success
  • It can fail with OrderError
  • It requires four services to run

You don’t need to read the implementation. You don’t need to trace through function calls. The type tells you exactly what dependencies are involved.

And the compiler enforces it. If you try to run this effect without providing all four services, you get a compile error. No runtime “service not found” exceptions. No forgetting to initialize something.

R Flows Through Composition

When you combine effects, the bound effects must agree on one environment type. Use a shared environment type when multiple services are needed:

fn get_user(id: u64) -> Effect<User, DbError, Database> { ... }
fn send_email(to: &str, body: &str) -> Effect<(), EmailError, EmailService> { ... }

fn notify_user(id: u64) -> Effect<(), AppError, AppEnv> {
    effect! {
        let user = bind* get_user(id)
            .zoom_env(|env: &mut AppEnv| env.database.clone())
            .map_error(AppError::Db);
        bind* send_email(&user.email, "Hello!")
            .zoom_env(|env: &mut AppEnv| env.email.clone())
            .map_error(AppError::Email);
    }
}

The notify_user function now documents that callers must supply AppEnv, and each inner effect explicitly projects the part it needs.

The Unit Environment: ()

When R = (), the effect is self-contained. It doesn’t need anything from the outside world to run:

let standalone: Effect<i32, String, ()> = succeed(42);

// Can run immediately — no dependencies
let result = run_blocking(standalone, ());

Most effects start with requirements and gradually have them satisfied as you move toward the “edge” of your program:

// Deep in your code: many requirements
fn business_logic() -> Effect<Result, Error, (Db, Cache, Logger, Config)>

// At the edge: provide everything
fn main() {
    let db = connect_database();
    let cache = connect_cache();
    let logger = setup_logger();
    let config = load_config();

    let env = AppEnv { db, cache, logger, config };

    run_blocking(business_logic(), env);
}

Reading Effect Signatures

Let’s practice reading some signatures:

// Produces String, never fails, needs nothing
Effect<String, Never, ()>

// Produces i32, can fail with ParseError, needs nothing
Effect<i32, ParseError, ()>

// Produces User, can fail with DbError, needs Database
Effect<User, DbError, Database>

// Produces (), can fail with AppError, needs Database, Cache, and Logger
Effect<(), AppError, (Database, Cache, Logger)>

With practice, you’ll read these as fluently as you read Result<T, E>. The extra R parameter becomes second nature.

What’s Next

We’ve seen that effects are descriptions, not actions. We’ve seen that Effect<A, E, R> encodes success type, error type, and requirements.

But we haven’t answered the obvious question: why does this matter? Why is it better to describe computations than to just do them?

The answer is laziness. And laziness, it turns out, is a superpower.

Laziness as a Superpower

So far we’ve established that Effect<A, E, R> is a description of a computation — a recipe that does nothing until someone executes it. You might be thinking: “OK, but why is that good? I have to run it eventually. What do I gain by waiting?”

Quite a bit, if your program benefits from composing and testing before execution.

Here is what you can do with a computation you have not run yet.

Effect values vs driving an async fn

Rust futures are lazy: calling an async fn returns a Future; the body runs when that future is polled (for example with .await).

The contrast here is about what your API returns—a raw Future you must await immediately in the caller, versus an Effect value you can store, compose, and run later.

// Returns a Future; the HTTP work runs when this future is awaited / polled
async fn fetch_user_async(id: u64) -> Result<User, HttpError> {
    http_get(&format!("https://api.example.com/users/{id}")).await
}

// Returns a description; I/O runs when the effect is executed with an environment
fn fetch_user(id: u64) -> Effect<User, HttpError, HttpClient> {
    effect! {
        let user = bind* http_get(&format!("https://api.example.com/users/{id}"));
        user
    }
}

Calling fetch_user_async(1) only builds the future; the request runs when something polls it (typically at .await). Calling fetch_user(1) returns an Effect—still no I/O until you run that effect with a runner and the needed HttpClient.

The point is not that async fn is “eager.” It is that effects give you a first-class value to combine (retries, timeouts, tests) before you commit to a particular run.

Superpower #1: Compose First, Run Later

Because effects are values, you can build an entire program before running any of it:

fn load_dashboard(user_id: u64) -> Effect<DashboardPage, AppError, (Database, Cache, Logger)> {
    effect! {
        let user    = bind* fetch_user(user_id).map_error(AppError::Db);
        let posts   = bind* fetch_posts(user.id).map_error(AppError::Db);
        let profile = bind* build_profile(&user, &posts).map_error(AppError::Render);
        profile
    }
}

// Nothing has run yet. We have a value.
let page = load_dashboard(42);

// Chain more work onto it — still nothing runs
let logged_page = page.flat_map(|p| log_view(p));

// Only now does any of this execute
run_blocking(logged_page, env);

Every line before run_blocking is pure data manipulation. You’re assembling a pipeline. The pipeline can be inspected, transformed, passed to other functions, stored in a struct. The laws of composition apply cleanly because there are no side-effects sneaking in.

Superpower #2: Retry Without Rewriting

Because an effect is a description, you can wrap it with new behavior without touching the original:

// Add exponential back-off retry — no changes to call_payment_api
let resilient = retry(
    || call_payment_api(order.clone()),
    Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);

// Add more transformations around `resilient` as needed — still no changes
let bounded = resilient.map_error(PaymentError::RetryExhausted);

Compare this to the async version: to add retries to an async fn, you’d either modify the function body, wrap it in a helper that calls it in a loop, or reach for an external crate. The retry logic gets tangled with the business logic.

With effects, retry is just another description. retry takes a factory for one-shot effects and a Schedule, then returns a new effect. No surgery on the original operation required.

Superpower #3: Test Without Mocking the Universe

Because nothing runs until you provide the environment, tests can substitute controlled implementations without rewriting a single line of production code:

#[test]
fn user_not_found_returns_error() {
    let test_env = TestEnv::new()
        .with_http(stub_http_404_for("/users/99"));

    let result = run_test(fetch_user(99), test_env);

    assert!(matches!(result, Exit::Failure(Cause::Fail(HttpError::NotFound))));
}

The same fetch_user function used in production runs in the test — just against a different environment. No #[cfg(test)] stubs. No Arc<dyn Trait> that you only swap out in tests. The type system ensures you’ve provided every dependency the effect declared.

Sequential async vs bundled descriptions

Sequential async fn code is natural for linear flows: each .await advances the next step, and control matches the source order.

Effect-oriented APIs often bundle those steps into a single Effect value first, then apply cross-cutting behavior (retry, timeout, tracing) as transformations on that value before calling run_*.

That separation is useful when the same workflow must be reused under different policies or tested with a substituted environment, without copying the body of the async function.

When Does It Actually Run?

There are exactly three places where an Effect executes:

// In a binary or application entry point
run_blocking(program, env);

// In an async context
run_async(program, env).await;

// In tests
run_test(program, test_env);

Everywhere else, you’re building, transforming, or combining descriptions. The runtime boundary is explicit. You know exactly where the side-effects begin.

Until run_* is called, your effect is just data: composable and easy to substitute in tests.


That’s Chapter 1. You now have a picture of why teams adopt effects (errors, dependencies, concurrency structure), what an Effect is (a description executed with an environment), what the type parameters mean (A = success, E = failure, R = requirements), and why keeping work in description form matters for composition and testing.

Chapter 2 gets hands-on: first effects, map, flat_map, and a small end-to-end program.

Your First Effect

Chapter 1 was all philosophy. We established what effects are, why they exist, and why laziness is useful. Now we get our hands dirty.

By the end of this chapter you will have written real effects, transformed them, chained them together, and run a complete small program. You’ll use succeed, fail, map, map_error, and flat_map — the five operations that cover the vast majority of day-to-day effect work.

Let’s start with the simplest question: how do you create an effect in the first place?

Creating Effects — succeed, fail, and pure

Every effect starts as either a success or a failure. The two constructors that express this are succeed and fail.

succeed

succeed wraps a value into an effect that, when run, immediately produces that value:

use effectful::{Effect, succeed};

let answer: Effect<i32, String, ()> = succeed(42);
let greeting: Effect<String, String, ()> = succeed("Hello, world!".to_string());

Nothing happens when you call succeed. You get back a description — a lazy recipe that says “produce this value when someone asks.” The 42 is already there, but no computation has been executed.

The type parameters are important:

  • A = i32 — the value we produce
  • E = String — the error type (unused here, but we still have to pick one)
  • R = () — no environment needed

If you prefer the FP vocabulary, pure is an alias for succeed:

use effectful::pure;

let effect = pure(42_i32);

Both names refer to exactly the same thing. Use whichever feels natural in context.

fail

fail wraps an error into an effect that, when run, immediately fails with that error:

use effectful::{Effect, fail};

let oops: Effect<i32, String, ()> = fail("something went wrong".to_string());

Again, nothing executes. oops is a description of a failure, not the failure itself. You can pass it around, store it, and transform it without triggering any error handling.

The type annotation matters: Effect<i32, String, ()> says this would have produced an i32 on success — we just know it won’t.

From a Closure

For cases where you want to capture some computation in an effect (but still defer it):

use effectful::{Effect, effect};

let computed: Effect<i32, String, ()> = effect!(|_r: &mut ()| {
    let x = expensive_calculation();
    x * 2
});

The body of effect! runs lazily — only when the effect is executed. This is the workhorse macro we’ll cover thoroughly in Chapter 3.

Type Inference

Rust’s type inference often lets you skip the annotations:

// Types inferred from usage
let answer = succeed(42);      // Effect<i32, _, ()>
let greeting = succeed("hi"); // Effect<&str, _, ()>

The error type E is usually inferred from how the effect is used later — when you chain it with other effects that can fail, the error type propagates. You’ll only need to annotate explicitly when the compiler asks.

Quick Reference

succeed(value)    // Effect that produces value
pure(value)       // Alias for succeed
fail(error)       // Effect that fails with error
effect!(|_r| { … }) // Effect from a lazy closure

These three constructors cover every starting point. Everything else is transformation and composition.

Transforming Success — map and its Friends

You have an effect. It produces some value. But you want a different value — or a different error. That’s what map and map_error are for.

map

map transforms the success value without running any new effects:

use effectful::{succeed, Effect};

let number: Effect<i32, String, ()> = succeed(21);
let doubled: Effect<i32, String, ()> = number.map(|n| n * 2);
let text: Effect<String, String, ()> = doubled.map(|n| n.to_string());

None of these .map() calls executes anything. Each one wraps the previous description in a new layer: “and then transform the result with this function.” The chain of transformations only runs when you call run_blocking or similar.

The type of the effect changes with each map. The A parameter shifts:

// Effect<i32, String, ()>
//   .map(|n: i32| n.to_string())
// → Effect<String, String, ()>

The E (error type) and R (requirements) stay the same. .map touches only the success path.

map_error

map_error transforms the failure type, leaving the success path untouched:

use effectful::fail;

#[derive(Debug)]
struct AppError(String);

let db_err: Effect<String, String, ()> = fail("db connection failed".to_string());
let app_err: Effect<String, AppError, ()> = db_err.map_error(|s| AppError(s));

This is typically used at module boundaries when you need to unify error types. A database layer might return DbError, but your application layer needs AppError. map_error does the conversion without touching anything else.

Why These Don’t Execute Anything

It’s worth repeating: neither map nor map_error runs any computation.

let effect = succeed(42)
    .map(|n| { println!("mapping!"); n + 1 })
    .map(|n| n * 2);

// At this point: nothing has printed, nothing has computed.
// We have a description of three steps.

let result = run_blocking(effect, ());
// NOW the effect runs. "mapping!" prints once. Result is 86.

This is the promise of laziness: you can build pipelines of transformations without triggering side effects until the moment you choose.

Combining map and map_error

A common pattern is calling both to normalise an effect into your domain’s types:

fn fetch_user_record(id: u64) -> Effect<User, AppError, ()> {
    raw_db_fetch(id)
        .map(|row| User::from_row(row))
        .map_error(|e| AppError::Database(e))
}

The effect goes in with raw DB types; it comes out with domain types. The transformation chain documents the conversion at a glance.

and_then / and_then_discard

Two more helpers are worth knowing:

// and_then: sequence two effects, keep the second result
let validated: Effect<i32, String, ()> = succeed(42)
    .and_then(succeed(100));

// and_then_discard: sequence two effects, keep the first result
let kept_left: Effect<i32, String, ()> = succeed(42)
    .and_then_discard(succeed(()));

Use flat_map when the second effect depends on the first value.

Summary

MethodChangesDoes not change
.map(f)A (success type)E, R
.map_error(f)E (error type)A, R
.tap(f)nothingA, E, R

None of them execute the effect. They all return new, larger descriptions.

Chaining Effects — flat_map and the Bind

map handles the case where your transformation is a pure function: A → B. But often the next step is itself an effect. You don’t want Effect<Effect<B, E, R>, E, R> — you want Effect<B, E, R>. That’s flat_map.

The Problem with map for Effects

Say you want to fetch a user and then fetch their posts:

fn get_user(id: u64) -> Effect<User, DbError, Database> { ... }
fn get_posts(user_id: u64) -> Effect<Vec<Post>, DbError, Database> { ... }

If you try to use map:

// This gives Effect<Effect<Vec<Post>, DbError, Database>, DbError, Database>
// — a nested effect, not what we want
let wrong = get_user(1).map(|user| get_posts(user.id));

map’s function must return a plain value. If it returns an Effect, you get nesting.

flat_map: Chain Without Nesting

flat_map (also known as and_then on effects) takes a function A → Effect<B, E, R> and “flattens” the result:

let combined: Effect<Vec<Post>, DbError, Database> =
    get_user(1).flat_map(|user| get_posts(user.id));

Now you have one flat effect that, when run, first fetches the user, then uses the result to fetch posts. The nesting is gone.

Chaining Multiple Steps

flat_map chains read left-to-right, but deep chains get noisy:

// Gets unwieldy quickly
let program = get_user(1)
    .flat_map(|user| get_posts(user.id)
        .flat_map(|posts| render_page(user, posts)));

This is where the effect! macro comes in.

The effect! Macro as Syntactic Sugar

The effect! macro turns flat_map chains into readable sequential code using the bind* operator:

use effectful::effect;

let program: Effect<Page, AppError, Database> = effect! {
    let user  = bind* get_user(1).map_error(AppError::Db);
    let posts = bind* get_posts(user.id).map_error(AppError::Db);
    let page  = render_page(user, posts);
    page
};

The bind* operator is the bind: “run this effect and give me its success value.” Each bind* expr desugars to a flat_map. The whole block is one effect.

Note that render_page (a pure function with no bind*) is just a normal Rust expression — it runs inside the macro body during execution.

Error Short-Circuiting

Like ? in Result, if any bind* step fails, the whole effect! exits early with that error:

let program: Effect<Page, AppError, Database> = effect! {
    let user = bind* get_user(999).map_error(AppError::Db);
    // If get_user fails, execution stops here.
    // The rest never runs.
    let posts = bind* get_posts(user.id).map_error(AppError::Db);
    render_page(user, posts)
};

This is sequential, not parallel. Each step waits for the previous.

map vs flat_map — When to Use Each

SituationUse
Transformation returns a plain value.map(f)
Transformation returns an Effect.flat_map(f) or effect! { bind* ... }
More than one sequential stepeffect! { bind* ... } macro

A rule of thumb: if you find yourself writing effect.map(|v| another_effect(v)) and noticing the nested type, switch to flat_map or the macro.

The Full Picture

// All equivalent:

// 1. Explicit flat_map
get_user(1)
    .flat_map(|user| get_posts(user.id))

// 2. Using effect! with bind* effect! {
    let user = bind* get_user(1);
    bind* get_posts(user.id)
}

// 3. Short form for single bind
effect! { bind* get_user(1).flat_map(|u| get_posts(u.id)) }

The effect! macro is the idiomatic choice for anything more than one step. Chapter 3 covers it in full detail.

Your First Real Program

Let’s build something complete: a small program that loads configuration, connects to a database, queries a user, and formats a greeting. It’s simple enough to fit on one page, but real enough to demonstrate the full effect workflow.

The Domain

#[derive(Debug)]
struct Config {
    db_url: String,
    app_name: String,
}

#[derive(Debug)]
struct User {
    id: u64,
    name: String,
    email: String,
}

#[derive(Debug)]
enum AppError {
    Config(String),
    Database(String),
}

The Individual Steps

Each step is a focused effect:

use effectful::{Effect, effect, succeed, fail};

fn load_config() -> Effect<Config, AppError, ()> {
    // In a real app, read from a file or env vars
    succeed(Config {
        db_url: "postgres://localhost/myapp".to_string(),
        app_name: "Greeter".to_string(),
    })
}

fn connect_db(config: &Config) -> Effect<Database, AppError, ()> {
    Database::connect(&config.db_url)
        .map_error(|e| AppError::Database(format!("connect: {e}")))
}

fn fetch_user(db: &Database, id: u64) -> Effect<User, AppError, ()> {
    db.query_user(id)
        .map_error(|e| AppError::Database(format!("query: {e}")))
}

fn format_greeting(config: &Config, user: &User) -> String {
    format!("{}: Hello, {}! ({})", config.app_name, user.name, user.email)
}

Composing the Program

Now we compose these steps into one effect using effect!:

fn greet_user(user_id: u64) -> Effect<String, AppError, ()> {
    effect! {
        let config = bind* load_config();
        let db     = bind* connect_db(&config);
        let user   = bind* fetch_user(&db, user_id);
        format_greeting(&config, &user)
    }
}

Read it like a recipe:

  1. Load config — if it fails, stop with AppError::Config
  2. Connect to DB — if it fails, stop with AppError::Database
  3. Fetch user — if it fails, stop with AppError::Database
  4. Format the greeting — this is pure, always succeeds

Nothing has run yet. greet_user(42) is a value.

Running It

At the edge of the program — in main — we execute:

fn main() {
    match run_blocking(greet_user(42), ()) {
        Ok(greeting) => println!("{greeting}"),
        Err(AppError::Config(msg)) => eprintln!("Config error: {msg}"),
        Err(AppError::Database(msg)) => eprintln!("DB error: {msg}"),
    }
}

Testing It

Because the effect is a description, testing is straightforward — just swap out the underlying steps:

#[test]
fn test_greeting_format() {
    let effect = effect! {
        let config = bind* succeed(Config {
            db_url: "unused".into(),
            app_name: "TestApp".into(),
        });
        let user = bind* succeed(User {
            id: 1,
            name: "Alice".into(),
            email: "alice@example.com".into(),
        });
        format_greeting(&config, &user)
    };

    let result = run_test(effect, ());
    assert!(matches!(result, Exit::Success(greeting) if greeting == "TestApp: Hello, Alice! (alice@example.com)"));
}

No mocking framework. No Arc<dyn Trait> plumbing. Just substitute different succeed values for the steps you want to control.

What You Just Learned

You’ve written a complete effect-based program. Along the way you used:

  • succeed and fail to construct effects from values
  • .map and .map_error to transform success and error types
  • effect! { bind* ... } to sequence effects without callback nesting
  • run_blocking to execute at the program edge
  • run_test to verify behaviour in tests

That’s the core of 90% of what you’ll write day-to-day. The next two chapters go deeper: Chapter 3 explores the effect! macro in detail, and Chapter 4 begins the tour of R — the environment type that makes dependency injection a compile-time guarantee.

You just wrote your first effect-based program. It won’t be your last.

The effect! Macro — Do-Notation for Mortals

Chapter 2 introduced the effect! macro as “syntactic sugar for flat_map.” That’s technically accurate, but undersells it. In practice, effect! is how you write almost every multi-step computation in effectful.

This chapter covers the why, the how, and the limits of the macro. By the end you’ll be fluent in bind*, comfortable handling errors inside the macro, and clear on when not to use it.

Why Do-Notation Exists

Consider three steps that each depend on the previous result:

fn step_a() -> Effect<i32, Err, ()>   { succeed(1) }
fn step_b(n: i32) -> Effect<i32, Err, ()> { succeed(n * 2) }
fn step_c(n: i32) -> Effect<String, Err, ()> { succeed(n.to_string()) }

Written with raw flat_map:

let program = step_a()
    .flat_map(|a| step_b(a)
        .flat_map(|b| step_c(b)));

Two steps: readable. Five steps: a pyramid. Ten steps: indistinguishable from callback hell.

Haskell solved this decades ago with do-notation. Scala’s for-comprehensions do the same thing. Rust doesn’t have built-in do-notation, so effectful provides it via a macro.

Do-Notation as a Concept

Do-notation lets you write sequential effectful code that looks like imperative code:

do
  a ← step_a
  b ← step_b(a)
  c ← step_c(b)
  return c

Each means “run this effect and bind its result to this name.” If any step fails, the whole computation short-circuits.

Rust can’t use the symbol, so effectful uses bind* (prefix bind-star):

effect! {
    let a = bind* step_a();
    let b = bind* step_b(a);
    let c = bind* step_c(b);
    c
}

Same semantics. Rust syntax. Zero nesting.

How the Desugaring Works

The macro transforms each bind* expr into a flat_map:

// Written:
effect! {
    let a = bind* step_a();
    let b = bind* step_b(a);
    b.to_string()
}

// Roughly expands to:
step_a().flat_map(|a| {
    step_b(a).flat_map(|b| {
        succeed(b.to_string())
    })
})

The macro generates exactly the nested flat_map chain you’d write by hand — just without the visual noise.

One Body, One block

One discipline matters: use one effect! block per function. Don’t branch between two macro bodies:

// BAD — two separate effect! blocks for one computation
if flag {
    effect! { let x = bind* a(); x }
} else {
    effect! { let y = bind* b(); y }
}

// GOOD — one block, branching inside
effect! {
    if flag {
        bind* a()
    } else {
        bind* b()
    }
}

A single effect! block is a single description. Splitting it into multiple blocks loses the composition guarantee.

Pure Expressions

Not every line inside effect! has to be an effect. Pure Rust expressions work normally:

effect! {
    let user = bind* fetch_user(id);
    let name = user.name.to_uppercase();  // pure — no bind*     let posts = bind* fetch_posts(user.id);
    (name, posts)
}

Only use bind* when the expression has type Effect<_, _, _>. Pure expressions just run inline.

The bind* Operator Explained

The bind* (bind-star) is the bind operator inside effect!. It means: “execute this effect and give me its success value; if it fails, propagate the failure and stop.”

Basic Usage

effect! {
    let user = bind* fetch_user(42);   // bind the result to `user`
    user.name
}

bind* fetch_user(42) desugars to a flat_map. The rest of the block becomes the body of the closure.

Discarding Results

When you don’t need the value, use bind* without a binding:

effect! {
    bind* log_event("processing started");   // run for side effect, discard result
    let result = bind* do_work();
    bind* log_event("processing done");
    result
}

Both bind* log_event(...) expressions run for their effects and the () return is discarded.

Method Calls on Effects

bind* works on any expression that evaluates to an Effect. That includes method chains:

effect! {
    let user = bind* fetch_user(id).map_error(AppError::Database);
    let posts = bind* retry(
        || fetch_posts(user.id).map_error(AppError::Database),
        Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
    );
    (user, posts)
}

The bind* applies to the entire expression that follows it. For retry/repeat, use the free functions that return an Effect.

bind* in Conditionals and Loops

You can use bind* inside if expressions and loops:

effect! {
    let value = if condition {
        bind* compute_a()
    } else {
        bind* compute_b()
    };
    process(value)
}

Both branches are effects; the macro handles either path.

effect! {
    for id in user_ids {
        bind* process_user(id);  // sequential: one at a time
    }
    "done"
}

Note: this is sequential iteration. For concurrent processing, use fiber_all (Chapter 9).

What bind* Cannot Do

bind* only works inside an effect! block. Calling it outside is a compile error:

// Does not compile — bind* is not valid here
let x = bind* fetch_user(42);

// Must be inside effect!
let x = effect! { bind* fetch_user(42) };

Also, bind* cannot bind across an async closure boundary. If you’re calling from_async, the body of the async block is separate:

effect! {
    let result = bind* from_async(|_r| async move {
        // Inside here, you're in regular Rust async — no bind*.
        let data = some_future().await?;
        Ok(data)
    });
    result
}

Use bind* outside the async move block; use .await inside it.

The Old Postfix Syntax (Deprecated)

Early versions of effectful used a postfix bind-star: expr ~. This is no longer valid. Always use the prefix form:

// OLD — do not use
step_a() ~;

// GOOD
bind* step_a();
let x = bind* step_b();

If you see postfix bind-star in older code, update it to the prefix form.

Error Handling Inside effect!

The bind* operator short-circuits on failure — if a bound effect fails, the whole effect! block fails with that error. But you can also handle errors within the block.

The Default: Short-Circuit

effect! {
    let a = bind* step_a();    // if this fails → whole block fails
    let b = bind* step_b(a);   // if this fails → whole block fails
    b
}

This matches ? in Result. You get clean sequencing at the cost of aborting early. For most code, that’s exactly what you want.

Catching Errors Mid-block

To handle an error inline and continue, use .catch before the bind*:

effect! {
    let user = bind* fetch_user(id).catch(|_| succeed(User::anonymous()));
    // If fetch_user fails, we get User::anonymous() and continue
    render_user(user)
}

.catch converts a failure into a success (or a different effect). The bind* then sees a successful effect.

Converting Errors with map_error

Often you have multiple effect types with different E parameters and need to unify them:

#[derive(Debug)]
enum AppError {
    Db(DbError),
    Network(HttpError),
}

effect! {
    let user = bind* fetch_user(id).map_error(AppError::Db);
    let data = bind* fetch_external_data(user.id).map_error(AppError::Network);
    process(user, data)
}

Both effects are converted to the same AppError before binding. The block’s E parameter is AppError throughout.

Handling Errors as Values

The current Effect API does not expose a fold method. Use .catch / .catch_all, or run the effect and pattern match on Result at the boundary.

effect! {
    let outcome = bind* risky_operation()
        .map(|val| format!("Success: {val}"))
        .catch_all(|err| format!("Error: {err}"));
    log_outcome(outcome)
}

catch_all turns a typed failure into a fallback success value, so the resulting effect is infallible through E.

Re-raising Errors

Inside a .catch handler, you can inspect the error and decide whether to recover or re-fail:

effect! {
    let result = bind* db_operation().catch(|error| {
        if error.is_transient() {
            // Transient: retry once with a fallback
            fallback_db_operation()
        } else {
            // Permanent: re-raise
            fail(error)
        }
    });
    result
}

fail(error) inside a handler produces a failing effect — the outer bind* then propagates it.

Accumulating Multiple Errors

Short-circuit stops at the first error. The current root API does not include validate_all; collect independent validation errors manually at the boundary:

let mut errors = Vec::new();

if let Err(error) = run_blocking(validate_name(&input.name), ()) {
    errors.push(error);
}
if let Err(error) = run_blocking(validate_email(&input.email), ()) {
    errors.push(error);
}
if let Err(error) = run_blocking(validate_age(input.age), ()) {
    errors.push(error);
}

Chapter 8 covers accumulation patterns in detail.

The Rule of Thumb

WantDo
Stop at first failureplain bind* effect
Provide a fallback`bind* effect.catch(
Unify error typesbind* effect.map_error(Into::into)
Turn failure into value`bind* effect.catch_all(
Collect all failuresManual accumulation outside the macro

When Not to Use the Macro

effect! is the idiomatic choice for most multi-step computations. But it’s a macro — which means it has edges. Knowing when to reach for raw flat_map instead saves debugging time.

Use Raw flat_map for Single-Step Transforms

When there’s exactly one effectful step and you’re transforming its result, flat_map is cleaner:

// Unnecessarily verbose
effect! {
    let id = bind* parse_id(raw);
    id
}

// Clear and direct
parse_id(raw).flat_map(|id| succeed(id))
// or just:
parse_id(raw)

Use effect! when you have two or more sequential steps. For one, flat_map or .map is usually enough.

Use Combinators for Structural Patterns

Some patterns have named combinators that are more expressive than macros:

// Instead of:
effect! {
    let a = bind* step_a();
    let b = bind* step_b();
    (a, b)
}

// Consider (when steps are independent):
step_a().zip(step_b())

zip communicates intent: “I need both, in any order.” The effect! version implies sequential dependency. For independent steps, prefer explicit combinators. (For concurrent independent steps, see fiber_all in Chapter 9.)

Avoid Deep Nesting Within the Block

The macro eliminates nesting between flat_map chains. But you can still create nested effect! blocks, which gets confusing:

// CONFUSING — nested macro bodies
effect! {
    let result = bind* effect! {      // inner macro
        let x = bind* inner_step();
        x * 2
    };
    result + 1
}

// BETTER — flatten it
effect! {
    let x = bind* inner_step();
    let result = x * 2;
    result + 1
}

If you feel the urge to nest effect! inside effect!, flatten the outer block instead.

The Macro and Type Inference

The macro occasionally confuses the type inferencer, especially when the error type isn’t pinned early. If you see cryptic “can’t infer type” errors inside effect!:

  1. Annotate the return type of the enclosing function explicitly
  2. Add a .map_error(Into::into) on the first bind* binding to anchor E
  3. As a last resort, break out the inner logic into a named helper function

When Generic Returns Are Needed

Library code with polymorphic A, E, R sometimes can’t use the macro cleanly:

// This works fine with explicit function + effect!
pub fn load_config<A, E, R>() -> Effect<A, E, R>
where
    A: From<Config> + 'static,
    E: From<ConfigError> + 'static,
    R: 'static,
{
    effect!(|_r: &mut R| {
        let cfg = read_env_config()?;
        A::from(cfg)
    })
}

The closure form of effect! (with |_r: &mut R|) is the right tool for generic graph-builder functions. It’s still the macro, just in its raw form.

Summary

SituationPrefer
2+ sequential stepseffect! { bind* ... }
1 step, simple transform.map / .flat_map
Independent steps.zip / combinators
Generic <A, E, R> graph builder`effect!(
Structural patterns (zip, race, all)explicit combinators, not macro

The macro is a tool, not a religion. Use it when it makes the code read like a story; use combinators when they express intent more directly.

The R Parameter — Your Dependencies, Encoded in Types

Chapter 1 introduced R as “what an effect needs to run.” We kept it vague on purpose — you needed to understand effects before worrying about their environment.

Now it’s time to understand R properly. This chapter answers: what is R mechanically, how does it flow through composition, and how do you satisfy it?

The payoff is significant. Once you internalize R, compile-time dependency injection stops feeling like magic and starts feeling obvious.

R Revisited — More Than Just a Type Parameter

R is the environment type required to run an effect.

fn get_user(id: u64) -> Effect<User, DbError, Database>

This says: to run get_user, supply a Database environment.

R as a Contract

let effect = get_user(1);

// Missing environment: does not match current runner API.
// run_blocking(effect);

let user = run_blocking(effect, my_database)?;

You can also capture the environment first.

let ready = get_user(1).provide_env(my_database);
let user = run_blocking(ready, ())?;

Composition Requires One Environment Type

Inside one effect! block, bound effects must agree on the same R type after any adaptations.

fn get_user(id: u64) -> Effect<User, DbError, Database> { /* ... */ }
fn get_posts(user_id: u64) -> Effect<Vec<Post>, DbError, Database> { /* ... */ }

fn get_user_with_posts(id: u64) -> Effect<(User, Vec<Post>), DbError, Database> {
    effect! {
        let user = bind* get_user(id);
        let posts = bind* get_posts(user.id);
        (user, posts)
    }
}

When effects need different environment types, adapt them with zoom_env, use a common ServiceContext, or build an HList Context that exposes both services.

fn log(msg: &str) -> Effect<(), LogError, Logger> { /* ... */ }
fn get_user(id: u64) -> Effect<User, DbError, Database> { /* ... */ }

fn get_user_logged(id: u64) -> Effect<User, AppError, AppEnv> {
    effect! {
        bind* log(&format!("Fetching user {id}"))
            .zoom_env(|env: &mut AppEnv| env.logger.clone())
            .map_error(AppError::Log);

        bind* get_user(id)
            .zoom_env(|env: &mut AppEnv| env.database.clone())
            .map_error(AppError::Db)
    }
}

R as Documentation

Effect<Receipt, AppError, ServiceContext> says the computation reads services from a runtime service table. Effect<Receipt, AppError, Context<...>> says exactly which tagged HList cells are required. A concrete environment type like AppEnv documents which aggregate application environment must be supplied.

Why R Instead of Parameters?

Traditional Rust threads dependencies as function parameters. R moves that dependency requirement into the returned effect type, which makes it composable and delayable until the program edge.

The key rule: library functions return honest Effect<A, E, R> values; application edges decide which concrete environment to pass.

Providing Dependencies

An effect with a non-() R needs an environment before it can run. The current low-level API is explicit: pass the environment to the runner, or capture it with provide_env.

Run with an Environment

fn get_user(id: u64) -> Effect<User, DbError, Database> { /* ... */ }

let effect = get_user(42);
let user = run_blocking(effect, my_database)?;

run_blocking(effect, env) and run_async(effect, env) consume both the effect and the environment.

Capture an Environment

Use provide_env when you want to turn Effect<A, E, R> into Effect<A, E, ()> before running or composing at the edge.

let ready: Effect<User, DbError, ()> = get_user(42).provide_env(my_database);
let user = run_blocking(ready, ())?;

There is no raw .provide(value) / .provide_some(value) API in the current core effect surface.

ServiceContext and Layers

For derive-based services, effects can require ServiceContext and be provided with a Layer.

#[derive(Clone, Service)]
struct Database { /* ... */ }

fn get_user(id: u64) -> Effect<User, AppError, ServiceContext> {
    Effect::<Database, AppError, ServiceContext>::service::<Database>()
        .flat_map(move |db| db.get_user(id))
}

let layer = Layer::succeed(Database::new());
let user = run_blocking(get_user(42).provide(layer), ())?;

Effect::provide(layer) exists for Effect<_, _, ServiceContext>.

Tagged Contexts

For HList-style typed contexts, construct a Context and pass it as R.

service_key!(pub struct DbKey);

let env = service_env::<DbKey, _>(my_database);
let user = run_blocking(get_user(42), env)?;

Use effect.provide_head(value) when the effect’s environment is Context<Cons<Service<K, V>, Tail>> and you want to provide the head cell.

Program Edge Rule

Provide dependencies at the program edge: main, request adapters, integration tests, and top-level supervisors. Library functions should return effects that honestly describe their required R.

Widening and Narrowing — Environment Transformations

Sometimes your effect needs part of an environment, but the caller has the whole thing. This is where zoom_env comes in.

The Mismatch Problem

Imagine your application has a large environment type:

struct AppEnv {
    db: Database,
    logger: Logger,
    config: Config,
    metrics: MetricsClient,
}

You have a utility function that only needs a Logger:

fn log_event(msg: &str) -> Effect<(), LogError, Logger> { ... }

You can’t call this inside an effect! block that has AppEnv in scope — the types don’t match. You need to narrow the environment down.

zoom_env: Narrow the Environment

zoom_env adapts an effect to work with a larger environment by providing a lens from the larger type to the smaller one:

// Adapt log_event to work with AppEnv.
// The projection returns the smaller environment by value.
let app_log = log_event("hello").zoom_env(|env: &mut AppEnv| env.logger.clone());

Now app_log has type Effect<(), LogError, AppEnv>. The function extracts the Logger from AppEnv and feeds it to the original effect.

Inside effect!, the pattern looks like:

fn process(data: Data) -> Effect<(), AppError, AppEnv> {
    effect! {
        bind* log_event("start").zoom_env(|e: &mut AppEnv| e.logger.clone()).map_error(AppError::Log);
        bind* db_query(data).zoom_env(|e: &mut AppEnv| e.db.clone()).map_error(AppError::Db);
    }
}

Transforming the Environment

The current API uses zoom_env for both narrowing and transformation. It applies a function to convert the caller’s environment into what the effect needs:

// Effect needs a raw string URL
fn connect_raw() -> Effect<Database, DbError, String> { ... }

// You have a Config that contains the URL
let with_config = connect_raw().zoom_env(|cfg: &mut Config| cfg.db_url.clone());
// Now type is Effect<Database, DbError, Config>

There is no separate contramap_env method in the current API.

R as Documentation Revisited

These combinators highlight why R is valuable as documentation. When you see:

fn log_event(msg: &str) -> Effect<(), LogError, Logger>

You know exactly what this function needs. You don’t need to read its body to see if it also touches the database. The zoom_env call at the use site makes the adaptation explicit — it’s not hidden.

Compare to the pre-effect alternative:

// Traditional: you'd need to read the body to know what `env` is used for
fn log_event(env: &AppEnv, msg: &str) -> Result<(), LogError> { ... }

With R, the function declares what it needs. With zoom_env, the caller declares how to satisfy it.

When to Use These

In practice, zoom_env appears most often in library code — when writing reusable utilities that should work with any environment containing the right piece. Application code often uses ServiceContext, Context, and Layers instead.

Think of zoom_env as the manual fallback when the automatic layer-based wiring isn’t the right fit.

R as Documentation — Self-Describing Functions

The R parameter is often described as “the environment type.” That’s true, but it undersells the practical benefit. R is living documentation that the compiler enforces.

The Signature Tells the Story

Consider two versions of the same function:

// Version A: traditional async
async fn process_order(order: Order) -> Result<Receipt, Error> {
    // What does this use? Read the body to find out.
    // Database? PaymentGateway? Email? Metrics?
    // You'll have to trace through 200 lines to know.
}

// Version B: effect-based
fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger)> {
    // What does this use? Look at the signature.
    // Database ✓, PaymentGateway ✓, EmailService ✓, Logger ✓
    // Done.
}

Version B’s type is self-describing. You don’t need to read the implementation to understand its dependency surface.

Code Review Benefits

In a pull request, R changes are visible in the diff. If someone adds a call to send_metrics() inside process_order and the MetricsClient wasn’t previously in R, the function signature must change:

- fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger)>
+ fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger, MetricsClient)>

This diff is in the function signature — impossible to miss. With traditional parameters or singletons, new dependencies can silently appear in implementation bodies.

Refactoring Safety

When you refactor and remove a dependency, the R type shrinks. Callers that construct a concrete environment may need to simplify that environment too.

// After removing Logger from process_order:

// Before: process_order required AppEnv { db, logger }
let result = run_blocking(process_order(order), AppEnv { db, logger })?;

// After: process_order only requires Database
let result = run_blocking(process_order(order), db)?;

The compiler guides the cleanup when the environment type changes. Traditional singleton-style code can leave stale dependencies silently lingering.

Testing Clarity

When writing a test, R tells you exactly what you need to mock:

#[test]
fn test_process_order() {
    // R = (Database, PaymentGateway, EmailService, Logger)
    // So the test needs these four — no more, no less
    let result = run_test(
        process_order(test_order()),
        (mock_db(), mock_payment(), mock_email(), test_logger()),
    );
    assert!(matches!(result, Exit::Success(_)));
}

There’s no “I wonder if this also touches the metrics service” uncertainty. The type says it doesn’t. If you’re missing a mock, the code won’t compile.

R is Not Magic

It’s important to understand that R is just a type parameter. The “compile-time DI” property comes from:

  1. Functions declaring what they need in R
  2. Runners requiring an actual environment value of type R
  3. Composition preserving environment requirements in the resulting effect type

There’s no reflection, no registration, no framework. Just types.

The next chapter shows how Tags and Context make this scale beyond simple tuples — handling large, complex dependency graphs without positional ambiguity.

Tags and Context — Compile-Time Service Lookup

Chapter 4 showed how R encodes dependencies as types. We used simple types like Database and Logger. That works for small programs, but breaks down as the dependency graph grows.

This chapter introduces the solution: Tags. Tags give values compile-time identities, and Context assembles them into a heterogeneous list that the compiler can query by name, not by position.

By the end you’ll understand why effectful uses this structure instead of tuples, and how to extract exactly the service you need from any environment.

The Problem with Positional Types

If you’ve used tuples as R for a while, you’ve probably already hit the wall. Let’s make it explicit.

The Tuple Explosion

Two dependencies: perfectly readable.

Effect<A, E, (Database, Logger)>

Five dependencies: which is which?

Effect<A, E, (Pool, Pool, Logger, Config, HttpClient)>
//            ^^^^ two Pools — which is the main DB and which is the cache?

Tuples are positional. (Pool, Pool, ...) is ambiguous — both fields have the same type. There’s no way to distinguish them except by index, and index-based access is error-prone and breaks silently when you reorder the tuple.

The Fragility Problem

Positional types are fragile under change. Say your function started with:

fn foo() -> Effect<A, E, (Database, Logger)>
//                        0         1

Now a teammate adds Config between them:

fn foo() -> Effect<A, E, (Database, Config, Logger)>
//                        0         1       2

Every caller that was providing a tuple (db, log) must be updated to (db, config, log). The change in position is invisible to the type system — the compiler won’t tell you where the old index references are. It’s a silent bomb.

The Same-Type Collision

The deeper problem: Rust can’t distinguish Pool for the main database from Pool for the cache. They’re the same type. Positional tuples just accept both:

// V1: run with (main_pool, cache_pool)
// V2: accidentally swap them
run_blocking(effect, (cache_pool, main_pool))  // compiles, wrong at runtime

No compile error. Wrong behaviour. Possibly wrong for months before you notice.

What We Actually Need

We need a way to give each dependency a name — a compile-time identifier that’s independent of its type and its position in any list.

What if:

  • Database meant “the tagged Pool known as DatabaseTag”
  • Cache meant “the tagged Pool known as CacheTag”

Then you couldn’t accidentally swap them — they’d be different types even though both are Pool underneath.

That’s exactly what Tags provide.

Tags — Branding Values with Identity

A tag is a compile-time key. Tagged<K, V> stores a value V under key type K, so two values with the same runtime type can remain distinct in a typed context.

Tag and Tagged

use effectful::{Tagged, tagged};

struct DatabaseTag;
struct CacheTag;

let db: Tagged<DatabaseTag, Pool> = tagged::<DatabaseTag, _>(connect_database());
let cache: Tagged<CacheTag, Pool> = tagged::<CacheTag, _>(connect_cache());

let pool_ref: &Pool = &db.value;

Tag<K> itself is a zero-sized phantom identity. Most code works with Tagged<K, V> values rather than constructing Tag<K> directly.

Why Tags Help

Tagged<DatabaseTag, Pool> and Tagged<CacheTag, Pool> are different types even though both contain Pool. That prevents positional swaps.

fn needs_database(db: Tagged<DatabaseTag, Pool>) { /* ... */ }

let cache: Tagged<CacheTag, Pool> = tagged::<CacheTag, _>(pool);
needs_database(cache); // type error

service_key!

The legacy service_key! macro declares a nominal key type.

use effectful::service_key;

service_key!(pub struct DatabaseKey);
service_key!(pub struct CacheKey);

Pair the key with a value using Service<K, V> / Tagged<K, V>.

type DatabaseService = effectful::Service<DatabaseKey, Pool>;
let db = effectful::service::<DatabaseKey, _>(pool);

For new service-style code, prefer #[derive(Service)] on the service struct and ServiceContext.

NeedsX Supertraits

For HList contexts, a NeedsX trait can name a Get bound.

pub trait NeedsDatabase: Get<DatabaseKey, Target = Pool> {}
impl<R: Get<DatabaseKey, Target = Pool>> NeedsDatabase for R {}

pub fn get_user<R: NeedsDatabase>(id: u64) -> Effect<User, DbError, R> { /* ... */ }

This is a readability pattern, not a separate runtime feature.

Summary

ConceptPurpose
Tag<K>Zero-sized key identity
Tagged<K, V>Value V stored under key K
tagged::<K, _>(v)Construct a tagged cell
service_key!(pub struct K);Declare a nominal key type
Service<K, V>Alias for Tagged<K, V>

Context and HLists — The Heterogeneous Stack

Context<L> wraps a heterogeneous list of typed cells. In the tagged API, each cell is usually Tagged<K, V>.

The Structure: Cons / Nil

use effectful::{Cons, Nil, Tagged};

type Empty = Nil;
type WithDb = Cons<Tagged<DatabaseKey, Pool>, Nil>;
type WithDbAndLogger = Cons<Tagged<DatabaseKey, Pool>, Cons<Tagged<LoggerKey, Logger>, Nil>>;

Cons<Head, Tail> prepends one item to a list. Nil is the empty list.

Building Context Values

use effectful::{Cons, Context, Nil, tagged};

let env = Context::new(Cons(
    tagged::<DatabaseKey, _>(my_pool),
    Cons(tagged::<LoggerKey, _>(my_logger), Nil),
));

You can also prepend to an existing context:

let base = Context::new(Cons(tagged::<LoggerKey, _>(my_logger), Nil));
let full = base.prepend(tagged::<DatabaseKey, _>(my_pool));

Access

Context::get::<K>() reads the head cell when it has key K.

let pool: &Pool = full.get::<DatabaseKey>();

For non-head cells, use get_path::<K, P>() with an explicit type-level path such as ThereHere / Skip1.

let logger: &Logger = full.get_path::<LoggerKey, ThereHere>();

This explicit path is why application code often wraps bounds in NeedsX traits or uses ServiceContext instead.

Why HLists and Not HashMap?

An HList preserves each value’s type in the environment type. That gives compile-time lookup and no runtime downcast. The cost is verbose types like Cons<Tagged<A, V>, Cons<Tagged<B, W>, Nil>>.

Use HList Context when you want maximum static structure. Use ServiceContext when you want a simpler runtime service table keyed by service type.

Converting Context to ServiceContext

At the composition root you may have a statically-typed Context but need to hand it to code that expects ServiceContext. Use [IntoServiceContext]:

use effectful::{ctx, Effect, IntoServiceContext, MissingService, Service, ServiceContext,
  run_blocking};

#[derive(Clone, Hash, Service)]
struct Config { port: u16 }

let static_ctx = ctx!(Config => Config { port: 8080 });
let runtime_ctx: ServiceContext = static_ctx.into_service_context();

let program: Effect<u16, MissingService, ServiceContext> =
  Config::use_sync(|config| config.port);

assert_eq!(run_blocking(program, runtime_ctx), Ok(8080));

Only self-keyed service cells (Tagged<S, S> where S implements [Service]) convert. Arbitrary tagged values cannot silently enter runtime service lookup. Duplicate service types in one list make the head cell win, matching compile-time lookup intuition.

When to Use Which

SituationUse
Fixed compile-time HList; path-sensitive internalsContext
Derive-service app / layer code; runtime wiringServiceContext
Bridging the two at the composition rootctx!.into_service_context()

Get and GetMut — Extracting from Context

Get and GetMut are type-level lookup traits for HList contexts.

Get: Read-Only Access

use effectful::{Get, Here};

fn use_database<R>(env: &R) -> &Pool
where
    R: Get<DatabaseKey, Here, Target = Pool>,
{
    env.get()
}

The path parameter tells the compiler where the cell lives. Here means the head of the list.

GetMut: Mutable Access

use effectful::{GetMut, Here};

fn increment_counter<R>(env: &mut R)
where
    R: GetMut<CounterKey, Here, Target = Counter>,
{
    let counter = env.get_mut();
    counter.increment();
}

Use GetMut sparingly. For shared concurrent state, prefer TRef or a service that owns its mutation rules.

Explicit Paths

For values not at the head, use a path such as ThereHere or Skip1.

let logger: &Logger = env.get_path::<LoggerKey, ThereHere>();

Lookup is type-safe, but not magical: the path is part of the bound.

No bind* Tag Shorthand

The current effect! macro only binds expressions whose type implements the bind protocol, usually Effect<_, _, _>.

effect! {
    let user = bind* fetch_user(id);
    user
}

It does not support bind* DatabaseKey service lookup shorthand. Access tagged contexts with Context::get / Get, or use derive-service helpers such as UserRepository::use_.

NeedsX Supertraits

You can hide verbose Get bounds behind a trait.

pub trait NeedsDatabase: Get<DatabaseKey, Here, Target = Pool> {}
impl<R: Get<DatabaseKey, Here, Target = Pool>> NeedsDatabase for R {}

This is only a naming pattern; the compiler still verifies the underlying Get bound.

Compile-Time Guarantees

If a function requires NeedsDatabase, callers must supply an environment type that satisfies that trait. Missing tagged cells are compile-time errors for HList Contexts.

Layers — Building Your Dependency Graph

You’ve seen how R encodes what an effect needs, and how Context holds the values at runtime. But who builds the context?

In small programs you can construct context manually with ctx! and hand values to provide. In real applications, you need something more powerful: a way to declare how to build each piece of the environment, with automatic dependency ordering and lifecycle management.

That’s what Layers are for.

A Layer is a recipe for building part of an environment. It knows what it produces, what it needs to produce it, and (optionally) how to clean up afterward. Wire Layers together, and effectful figures out the right build order automatically.

What Is a Layer?

An Effect describes a computation that needs an environment. A layer describes how to build values or services that can become that environment.

Two Layer Surfaces

effectful currently exposes two related layer APIs.

LayerFn / LayerBuild
  Builds typed values, often `Tagged<K, V>` cells for HList `Context`s.

Layer<ROut, E, RIn>
  Builds services into `ServiceContext` for derive-service applications.

HList-Style Layer

use effectful::{LayerBuild, LayerFn, tagged};

let db_layer = LayerFn(|| {
    let pool = connect_pool_blocking(database_url)?;
    Ok(tagged::<DatabaseKey, _>(pool))
});

let db_cell = db_layer.build()?;

LayerFn is lazy: construction does nothing; build() runs the constructor.

ServiceContext Layer

use effectful::{Layer, Service};

#[derive(Clone, Service)]
struct Database { /* ... */ }

let db_layer = Layer::succeed(Database::new());
let context = run_blocking(db_layer.build(), ())?;

For dependencies, use Layer::effect and read upstream services from ServiceContext.

let db_layer = Layer::effect("Database", || {
    Config::use_(|config| Database::connect(config.database_url))
});

Providing to Effects

Effect::provide(layer) exists for Effect<_, _, ServiceContext>.

let result = run_blocking(my_app().provide(app_layer), ())?;

For typed Context environments, build the context and pass it directly to run_blocking(effect, env).

Lifecycle

Resource lifecycles are handled by Scope, Pool, and explicit finalizers. Layers are constructors; if a layer creates resources that require cleanup, make that cleanup part of the service design or the surrounding scope.

Building Layers — From Simple to Complex

effectful currently has two layer surfaces:

  • LayerFn / LayerBuild for typed HList-style values.
  • Layer<ROut, E, RIn> for ServiceContext services.

LayerFn

LayerFn wraps a zero-argument function returning Result<Output, Error>.

use effectful::{LayerBuild, LayerFn, tagged};

let config_layer = LayerFn(|| Ok(tagged::<ConfigKey, _>(Config::from_env()?)));
let config = config_layer.build()?;

Use LayerEffect::new(effect) when the output comes from an Effect and should be cached after first build.

use effectful::LayerEffect;

let db_layer = LayerEffect::new(connect_pool(config.db_url()).map(|pool| tagged::<DatabaseKey, _>(pool)));
let db = db_layer.build()?;

LayerFnFrom

Use LayerFnFrom when a layer depends on the output of a previous layer.

use effectful::{LayerFn, LayerFnFrom, LayerExt};

let config = LayerFn(|| Ok(Config::from_env()?));
let db = LayerFnFrom(|config: &Config| Ok(Database::connect(config.db_url())?));

let stack = config.and_then(db);
let env = stack.build()?;

ServiceContext Layers

For derive-based services, use Layer::succeed or Layer::effect.

#[derive(Clone, Service)]
struct Config { /* ... */ }

let config_layer = Layer::succeed(Config::from_env());

let db_layer = Layer::effect("Database", || {
    Config::use_(|config| Database::connect(config.database_url))
});

Layer::effect receives dependencies through ServiceContext, so it can use Effect::service::<S>() or S::use_.

Memoization

Layer<ROut, E, RIn> exposes .memoized().

let shared_config = config_layer.memoized();

LayerEffect in the HList surface caches by design after the first build.

Stacking Layers — Composition Patterns

Layer composition is explicit. Use Stack / and_then for HList-style layers, and merge / provide / provide_merge for ServiceContext layers.

Independent HList Layers

use effectful::{LayerBuild, Stack};

let stack = Stack(config_layer, logger_layer);
let env = stack.build()?; // Cons<ConfigOutput, Cons<LoggerOutput, Nil>>

The two layers must have the same error type.

Dependent HList Layers

Use LayerExt::and_then or StackThen when the second layer needs the first layer’s output.

use effectful::{LayerBuild, LayerExt, LayerFn, LayerFnFrom};

let config = LayerFn(|| Ok(Config::from_env()?));
let db = LayerFnFrom(|config: &Config| Ok(Database::connect(config.db_url())?));

let env = config.and_then(db).build()?;

ServiceContext merge

For derive-service layers, merge builds independent layers and combines their ServiceContexts.

let app_layer = config_layer.merge(logger_layer);
let context = run_blocking(app_layer.build(), ())?;

ServiceContext provide

Use provide when one layer needs services from another layer and you want to hide provider output.

let db_with_config = db_layer.provide(config_layer);
let context = run_blocking(db_with_config.build(), ())?;

Use provide_merge when you want both provider and dependent services in the final context.

let app_layer = db_layer.provide_merge(config_layer);

Providing to Effects

Effect::provide(layer) exists for effects whose environment is ServiceContext.

let result = run_blocking(my_application().provide(app_layer), ())?;

For typed Context / HList environments, build the context and pass it to run_blocking(effect, env).

Layer Graphs — Dependency Planning

LayerGraph is a planner for named layer nodes. It does not build layers itself; it computes a topological build order from requires and provides service names.

Declaring a Layer Graph

use effectful::{LayerGraph, LayerNode};

let graph = LayerGraph::new([
    LayerNode::new("config", std::iter::empty::<&str>(), ["config"]),
    LayerNode::new("db", ["config"], ["db"]),
    LayerNode::new("cache", ["config"], ["cache"]),
    LayerNode::new("service", ["db", "cache"], ["service"]),
]);

Each node has:

  • id: stable unique node id
  • requires: service names it needs
  • provides: service names it supplies

Planning

let plan = graph.plan_topological()?;
assert_eq!(plan.build_order, vec!["config", "db", "cache", "service"]);

Sibling order is deterministic but should not be used as a semantic dependency. If one layer must precede another, express that with requires / provides.

Planner Errors

plan_topological can fail with:

ErrorMeaning
DuplicateNodeIdTwo nodes share an id
ConflictingProviderMore than one node provides the same service name
MissingProviderA requirement has no provider
CycleDetectedDependencies contain a cycle
let bad_graph = LayerGraph::new([
    LayerNode::new("a", ["b"], ["a"]),
    LayerNode::new("b", ["a"], ["b"]),
]);

let err = bad_graph.plan_topological();
assert!(matches!(err, Err(LayerPlannerError::CycleDetected { .. })));

Use error.to_diagnostic() for user-facing messages and suggestions.

Planning from STM

LayerGraph::plan_topological_from_tref(&nodes_tref) reads a TRef<Vec<LayerNode>> snapshot transactionally and plans from that snapshot.

When to Use LayerGraph

Use LayerGraph when you need validation, diagnostics, or tool-visible dependency order. For a few layers in application code, direct layer composition is usually clearer.

Services — The Complete DI Pattern

The previous chapters established the building blocks: Tags (identities), Context (the environment), and Layers (constructors). Now we put them together into the complete Service pattern.

A Service in effectful is the combination of:

  1. A trait defining the interface
  2. A tag identifying it in the environment
  3. One or more implementations (production and test)
  4. A layer that wires an implementation into the environment

This is the full dependency injection story. By the end of this chapter you’ll have a working multi-service application wired entirely at compile time.

Service Traits — Defining Interfaces

Services can be modeled as cloneable structs that contain concrete clients, trait objects, or function handles. Derive Service on the struct to make it available through ServiceContext.

Define the Interface

trait UserRepositoryImpl: Send + Sync {
    fn get_user(&self, id: u64) -> Effect<User, DbError, ()>;
    fn save_user(&self, user: User) -> Effect<(), DbError, ()>;
}

#[derive(Clone, Service)]
struct UserRepository {
    inner: Arc<dyn UserRepositoryImpl>,
}

impl UserRepository {
    fn get_user(&self, id: u64) -> Effect<User, DbError, ()> {
        self.inner.get_user(id)
    }
}

The service struct is the lookup key. It must be Clone + 'static.

Accessing a Service

Use Effect::service::<S>(), S::use_, or S::use_sync.

fn get_user_profile(id: u64) -> Effect<UserProfile, AppError, ServiceContext> {
    UserRepository::use_(move |repo| {
        repo.get_user(id)
            .map(UserProfile::from)
            .map_error(AppError::Db)
    })
}

If the service is missing, the lookup fails with MissingService; include From<MissingService> in your application error when using use_ / Effect::service.

Tagged Alternative

The older typed-context style uses service_key!(pub struct Key); plus Tagged<Key, V> / Service<Key, V>.

service_key!(pub struct UserRepositoryKey);
type UserRepositoryService = Service<UserRepositoryKey, Arc<dyn UserRepositoryImpl>>;

Use this when you specifically want HList Context types in function signatures. Prefer derive-service for application service tables.

Keeping Services Focused

Avoid a single god service. Split by capability.

#[derive(Clone, Service)]
struct Users { /* ... */ }

#[derive(Clone, Service)]
struct Mailer { /* ... */ }

#[derive(Clone, Service)]
struct Payments { /* ... */ }

Functions should read exactly the services they need from ServiceContext.

ServiceEnv and service_env

ServiceEnv<K, V> is a one-cell tagged Context. service_env::<K, V>(value) constructs that context; it is not an accessor effect.

Constructing a Tagged Service Environment

use effectful::{ServiceEnv, service_env, service_key};

service_key!(pub struct UserRepositoryKey);

let env: ServiceEnv<UserRepositoryKey, UserRepository> =
    service_env::<UserRepositoryKey, _>(repo);

The alias is:

type ServiceEnv<K, V> = Context<Cons<Service<K, V>, Nil>>;

Accessing Tagged Services

Use Context::get::<K>() or Get<K> bounds.

fn get_user(id: u64) -> Effect<User, DbError, ServiceEnv<UserRepositoryKey, UserRepository>> {
    Effect::new(move |env| {
        let repo = env.get::<UserRepositoryKey>();
        repo.get_user_blocking(id)
    })
}

Inside effect!, there is no bind* UserRepositoryKey shorthand in the current API.

Derive-Service Alternative

For most service code, prefer the #[derive(Service)] / ServiceContext API.

#[derive(Clone, Service)]
struct UserRepository { /* ... */ }

fn get_user(id: u64) -> Effect<User, AppError, ServiceContext> {
    UserRepository::use_(move |repo| repo.get_user(id))
}

let env = UserRepository::new().to_context();
let user = run_blocking(get_user(42), env)?;

This avoids exposing HList paths in most application signatures.

Interop: compile-time ContextServiceContext

When you have a statically-built Context and need to feed it into an effect that requires ServiceContext, convert at the edge:

use effectful::{ctx, Effect, IntoServiceContext, MissingService, Service, ServiceContext,
  run_blocking};

#[derive(Clone, Hash, Service)]
struct Config { port: u16 }

let static_ctx = ctx!(Config => Config { port: 8080 });
let env: ServiceContext = static_ctx.into_service_context();

let program: Effect<u16, MissingService, ServiceContext> =
    Config::use_sync(|config| config.port);

assert_eq!(run_blocking(program, env), Ok(8080));

This is the recommended path: build with ctx!, convert with .into_service_context(), then look up services with Service::use_sync or Effect::service.

Providing Services via Layers

The derive-service API makes a service type both key and value. Provide concrete services with Layer::succeed or construct them with Layer::effect.

Minimal Service Layer

use effectful::{Layer, Service};

#[derive(Clone, Service)]
struct UserRepository {
    pool: Pool,
}

let repo_layer = Layer::succeed(UserRepository { pool });

An effect can read the service from ServiceContext.

fn get_user(id: u64) -> Effect<User, AppError, ServiceContext> {
    UserRepository::use_(move |repo| repo.get_user(id))
}

Run by providing the layer at the edge.

let user = run_blocking(get_user(42).provide(repo_layer), ())?;

Layer with Dependencies

Layer::effect builds a service by running an effect in ServiceContext.

#[derive(Clone, Service)]
struct Config { database_url: String }

#[derive(Clone, Service)]
struct Database { /* ... */ }

let config_layer = Layer::succeed(Config::from_env());

let db_layer = Layer::effect("Database", || {
    Config::use_(|config| Database::connect(config.database_url))
});

let app_layer = db_layer.provide_merge(config_layer);

provide hides provider services in the final output. provide_merge keeps them.

Test Layer with Mock

Tests provide the same service type with a fake implementation.

#[derive(Clone, Service)]
struct UserRepository {
    users: Arc<HashMap<u64, User>>,
}

let test_layer = Layer::succeed(UserRepository::from_users([alice(), bob()]));
let exit = run_test(get_user(1).provide(test_layer), ());

Application code does not change; only the layer at the edge changes.

A Complete DI Example — Putting It Together

This example uses the current derive-service / ServiceContext layer API.

Domain

struct User { id: u64, name: String, email: String }
struct Post { id: u64, author_id: u64, title: String, body: String }
enum AppError { Db(DbError), Notify(NotifyError), Missing(MissingService) }

Services

#[derive(Clone, Service)]
struct UserRepository {
    inner: Arc<dyn UserRepositoryImpl>,
}

#[derive(Clone, Service)]
struct PostRepository {
    inner: Arc<dyn PostRepositoryImpl>,
}

#[derive(Clone, Service)]
struct Notifier {
    inner: Arc<dyn NotificationImpl>,
}

Each service struct is cloneable and acts as its own lookup key.

Business Logic

fn get_author_feed(author_id: u64) -> Effect<(User, Vec<Post>), AppError, ServiceContext> {
    UserRepository::use_(move |users| {
        PostRepository::use_(move |posts| {
            effect! {
                let user = bind* users.get_user(author_id).map_error(AppError::Db);
                let posts = bind* posts.get_posts_by_author(author_id).map_error(AppError::Db);
                (user, posts)
            }
        })
    })
}

The function depends on service types, not concrete infrastructure.

Production Layers

let config_layer = Layer::succeed(Config::from_env());

let db_layer = Layer::effect("Database", || {
    Config::use_(|config| Database::connect(config.database_url))
});

let user_repo_layer = Layer::effect("UserRepository", || {
    Database::use_(|db| succeed(UserRepository::postgres(db)))
});

let post_repo_layer = Layer::effect("PostRepository", || {
    Database::use_(|db| succeed(PostRepository::postgres(db)))
});

let app_layer = user_repo_layer
    .merge(post_repo_layer)
    .provide_merge(db_layer.provide_merge(config_layer));

Run at the edge:

fn main() {
    let result = run_blocking(get_author_feed(1).provide(app_layer), ());
    println!("{result:?}");
}

Test Wiring

fn test_layer() -> Layer<(UserRepository, PostRepository), AppError, ()> {
    Layer::succeed(UserRepository::in_memory([alice(), bob()]))
        .merge(Layer::succeed(PostRepository::in_memory([alice_post()])))
}

#[test]
fn feed_includes_author_posts() {
    let exit = run_test(get_author_feed(1).provide(test_layer()), ());
    assert!(matches!(exit, Exit::Success((_, posts)) if posts.len() == 1));
}

What This Demonstrates

Business logic is decoupled from infrastructure:

  • It reads services from ServiceContext.
  • Production and tests provide different layers.
  • Layer composition happens at the edge.
  • Missing services fail through MissingService if requested at runtime.

Part III shifts to operational concerns: errors, fibers, resources, and scheduling.

Error Handling — Cause, Exit, and Recovery

Part II gave you the full dependency injection story. Part III is about what happens when things go wrong — and in production, things always go wrong.

Rust’s Result<T, E> is excellent for expected errors: outcomes you anticipated and typed. But real programs also encounter unexpected failures: panics, OOM conditions, and cancelled fibers. effectful models all of these with a richer type hierarchy.

This chapter introduces Cause (the full error taxonomy), Exit (the terminal outcome of any effect), and the combinators for recovering from both.

Beyond Result — Why Cause Exists

Result<T, E> handles the errors you expect. But what about the errors that aren’t your E?

Expected vs. Unexpected

// Expected: you planned for this
Effect<User, UserNotFound, Db>

// But what if the database panics?
// What if the fiber is cancelled?
// What if the process runs out of memory?
// None of these are UserNotFound.

Traditional Rust handles unexpected failures through panics, which unwind (or abort) and bypass all your error handling. In async code, panics in tasks can silently swallow errors or leave resources unreleased.

The Cause Type

Cause<E> is effectful’s complete taxonomy of failure:

use effectful::Cause;

enum Cause<E> {
    Fail(E),                // Your typed, expected error
    Die(Box<dyn Any>),      // A panic or defect — something that shouldn't happen
    Interrupt,              // The fiber was cancelled
}

Every failure in the effect runtime is one of these three. Together they cover the full space of “things that can go wrong.”

  • Cause::Fail(e) — an error you declared in E, handled with catch or map_error
  • Cause::Die(payload) — a panic, logic bug, or fatal error; should be logged and treated as a defect
  • Cause::Interrupt — clean cancellation; the fiber was asked to stop and cooperated

Why This Matters

Without Cause, you can only handle Cause::Fail. The other two propagate invisibly up the fiber tree and may silently swallow logs or leave resources unreleased.

With Cause, you can handle all failure modes in a structured way:

my_effect.catch_all(|cause| match cause {
    Cause::Fail(e)    => recover_from_expected(e),
    Cause::Die(panic) => log_defect_and_fail(panic),
    Cause::Interrupt  => succeed(default_value()),
})

Resource finalizers (Chapter 10) use this same model — they run on any Cause, ensuring cleanup regardless of how the fiber ends.

Day-to-Day Usage

In normal application code you rarely inspect Cause directly. You use:

  • .catch(f) for handling Cause::Fail
  • .catch_all(f) when you need to handle panics or interruption too
  • Exit (next section) when you need to inspect the terminal outcome

The Cause type is mostly visible at infrastructure boundaries — resource finalizers, fiber supervisors, and top-level error handlers.

Exit — Terminal Outcomes

Exit<A, E> records whether an effect-like computation succeeded or failed with a structured Cause<E>.

The Exit Type

use effectful::{Cause, Exit};

enum Exit<A, E> {
    Success(A),
    Failure(Cause<E>),
}

The public constructors are Exit::succeed(value), Exit::fail(error), Exit::die(message), and Exit::interrupt(fiber_id).

Getting an Exit

run_blocking(effect, env) and run_async(effect, env) return Result<A, E>. The test harness returns Exit<A, E>.

use effectful::{Exit, run_test};

let exit: Exit<User, DbError> = run_test(get_user(1), env);

match exit {
    Exit::Success(user) => println!("Got user: {}", user.name),
    Exit::Failure(Cause::Fail(DbError::NotFound)) => println!("User not found"),
    Exit::Failure(Cause::Die(message)) => eprintln!("Defect: {message}"),
    Exit::Failure(Cause::Interrupt(id)) => println!("Interrupted fiber {id:?}"),
    Exit::Failure(Cause::Both(_, _) | Cause::Then(_, _)) => println!("Composite failure"),
}

There is no run_to_exit helper in the current API.

Converting Exit to Result

Use into_result() to get Result<A, Cause<E>>.

let result: Result<User, Cause<DbError>> = exit.into_result();

If you only want typed failures, pattern match on the cause.

let result: Result<User, AppError> = match exit.into_result() {
    Ok(user) => Ok(user),
    Err(Cause::Fail(e)) => Err(AppError::Expected(e)),
    Err(Cause::Die(message)) => Err(AppError::Defect(message)),
    Err(Cause::Interrupt(id)) => Err(AppError::Interrupted(id)),
    Err(cause) => Err(AppError::Composite(cause.pretty())),
};

There is no into_result_or_panic helper in the current API.

Exit in Fibers

FiberHandle::await_exit() returns Effect<Exit<A, E>, Never, ()>. FiberHandle::join().await returns Result<A, Cause<E>>.

let exit: Exit<A, E> = run_async(handle.await_exit(), ()).await?;
let result: Result<A, Cause<E>> = handle.join().await;

Use Exit when you need to preserve all failure detail. Use Result when typed failures are enough.

Recovery Combinators — catch and Friends

The current Effect recovery surface works with typed errors E. Cause<E> appears in Exit and fibers, not in ordinary catch handlers.

catch

catch handles a typed failure by returning another effect.

let resilient = risky_db_call().catch(|error: DbError| {
    match error {
        DbError::NotFound => succeed(User::anonymous()),
        other => fail(other),
    }
});

Use catch when recovery itself may still fail.

catch_all

catch_all maps any typed error into a fallback success value. The returned effect is infallible through E.

let user = fetch_user(id).catch_all(|_error| User::anonymous());

Despite the name, this does not handle Cause::Die or Cause::Interrupt; it handles the effect’s typed error channel.

tap_error

tap_error runs an effect when the source fails, then re-emits the original error if the tap succeeds.

let observed = risky_call().tap_error(|message| log_error(message));

The handler receives a debug-formatted string because the original error is re-emitted without requiring Clone.

map_error

Use map_error to translate errors into your application error type.

let effect = db_call().map_error(AppError::Database);

Not Present Yet

The current API does not include fold, or_else, or ignore_error methods on Effect. Use catch, catch_all, map, and map_error, or pattern match on run_blocking(effect, env) at a boundary.

Error Accumulation — Collecting All Failures

effect! and flat_map are fail-fast. That is correct for dependent steps, but independent validation sometimes needs to collect multiple failures.

Fail-Fast Behavior

effect! {
    let name = bind* validate_name(&input.name);     // if this fails, stop
    let email = bind* validate_email(&input.email);  // not run after name failure
    let age = bind* validate_age(input.age);
    User { name, email, age }
}

Use fail-fast composition when each step depends on earlier successful values.

Manual Accumulation

The current root API does not include validate_all or partition helpers. For independent validations, run them as plain Result-returning checks or explicitly run each effect and collect errors.

let mut errors = Vec::new();

if let Err(error) = run_blocking(validate_name(&input.name), ()) {
    errors.push(error);
}
if let Err(error) = run_blocking(validate_email(&input.email), ()) {
    errors.push(error);
}
if let Err(error) = run_blocking(validate_age(input.age), ()) {
    errors.push(error);
}

if errors.is_empty() {
    Ok(())
} else {
    Err(errors)
}

Keep this pattern at validation boundaries. Inside business workflows, prefer typed fail-fast effects.

Combining Error Types

When composing effects with different error types, use map_error or flat_map_union with Or.

use effectful::Or;

type BothErrors = Or<DbError, NetworkError>;

let combined: Effect<Data, BothErrors, ()> = db_fetch()
    .union_error::<NetworkError>()
    .flat_map(|db| network_fetch().map_error(Or::Right).map(move |net| merge(db, net)));

Or<A, B> lets you defer flattening into an application error until a boundary.

ParseErrors

ParseErrors is an aggregate container for schema-style validation issues.

let errors = ParseErrors::new(vec![
    ParseError::new("name", "must not be empty"),
    ParseError::new("email", "invalid email"),
]);

for issue in errors.issues {
    eprintln!("{}: {}", issue.path, issue.message);
}

Current schema combinators usually return the first ParseError; build ParseErrors yourself when your boundary wants to report multiple issues.

When to Accumulate vs. Short-Circuit

SituationUse
Dependent stepseffect! / flat_map
Independent form validationManual accumulation
Batch import with partial successExplicit loop collecting successes/failures
Schema boundary with many field issuesParseErrors::new(collected)

Concurrency & Fibers — Structured Async

Async Rust gives you the ability to do many things concurrently. The challenge is doing it safely — without fire-and-forget tasks that outlive their parent, without silent failures when a task panics, and without resource leaks when tasks are cancelled.

effectful uses Fibers for structured concurrency. A Fiber is a lightweight, interruptible async task with a typed result, an explicit lifecycle, and guaranteed cleanup.

This chapter covers spawning fibers, joining them, cancelling them gracefully, and using FiberRef for fiber-local state.

What Are Fibers? — Lightweight Structured Tasks

A fiber is an effectful concurrent task represented by FiberHandle<A, E>. It gives you a stable id, status inspection, interruption, and typed completion.

Fibers vs. Raw Tasks

// Raw tokio::spawn: lifecycle is owned by Tokio's JoinHandle.
tokio::spawn(async {
    do_something().await;
});

// effectful fiber: lifecycle is explicit through FiberHandle<A, E>.
let runtime = ThreadSleepRuntime;
let handle: FiberHandle<User, DbError> = run_fork(&runtime, || (get_user(1), env));
let result: Result<User, Cause<DbError>> = handle.join().await;

join() returns Result<A, Cause<E>>. Use await_exit() when you need Exit<A, E>.

FiberId

Each handle has a FiberId.

let id = handle.id();
log::debug!("spawned fiber {id:?}");

For code that needs to run under a specific fiber id, use with_fiber_id(id, || ...).

FiberHandle and FiberStatus

let status: FiberStatus = handle.status();

match status {
    FiberStatus::Running => {}
    FiberStatus::Succeeded => {}
    FiberStatus::Failed => {}
    FiberStatus::Interrupted => {}
}

handle.interrupt();

FiberHandle<A, E> is cloneable. Status inspection does not consume the handle.

Structured Cleanup

Fibers can be attached to a Scope with handle.scoped(). Closing the scope interrupts the fiber through a finalizer.

let scope = Scope::make();
let scoped_effect = handle.scoped(); // Effect<A, Cause<E>, Scope>

// Run scoped_effect with `scope`; when another owner closes `scope`,
// the registered finalizer interrupts the handle.

Use scopes when a parent computation must own child-fiber cleanup.

Spawning and Joining — run_fork and fiber_all

The current API uses run_fork to spawn effects and FiberHandle methods to join, inspect, or interrupt them.

run_fork: Spawn One Fiber

use effectful::{ThreadSleepRuntime, run_fork};

let runtime = ThreadSleepRuntime;
let handle = run_fork(&runtime, || (compute_expensive_result(), env));

let local_result = local_computation();
let remote_result = handle.join().await?;

(local_result, remote_result)

The factory returns (effect, env) so the worker owns both the one-shot effect and its environment.

await_exit vs join

let exit: Exit<A, E> = run_async(handle.await_exit(), ()).await?;
let result: Result<A, Cause<E>> = handle.join().await;

Use await_exit when you want the exact Exit; use join for a Result with Cause<E> on failure.

fiber_all

fiber_all joins already-created handles.

use effectful::{fiber_all, run_async};

let handles = user_ids
    .into_iter()
    .map(|id| run_fork(&runtime, move || (fetch_user(id), ())))
    .collect::<Vec<_>>();

let users: Vec<User> = run_async(fiber_all(handles), ()).await?;

If any handle fails, fiber_all returns the first Cause<E> it observes.

Racing

There is no fiber_race or fiber_any free function in the current API. FiberHandle::or_else races two handles and completes with whichever handle resolves first.

let primary = run_fork(&runtime, || (fetch_from_primary(), ()));
let backup = run_fork(&runtime, || (fetch_from_backup(), ()));

let raced = primary.or_else(backup);
let data = raced.join().await?;

The slower fiber is not automatically cancelled by or_else; keep its handle if you need to interrupt it.

Error Behavior

OperationFailure shape
handle.join().awaitResult<A, Cause<E>>
handle.await_exit()Exit<A, E> inside an infallible effect
fiber_all(handles)Effect<Vec<A>, Cause<E>, ()>
handle.or_else(other)First handle completion wins

Cancellation — Interrupting Gracefully

Cancellation is explicit and cooperative. CancellationToken is a shared flag; FiberHandle::interrupt() marks a fiber handle as interrupted.

CancellationToken

use effectful::{CancellationToken, check_interrupt};

let token = CancellationToken::new();
let child = token.child_token();

assert!(!token.is_cancelled());
token.cancel();
assert!(child.is_cancelled());

Cancelling a parent token cancels child tokens. Cancelling a child does not cancel its parent.

Checking for Cancellation

check_interrupt(&token) snapshots whether the token is cancelled.

fn process_large_dataset(token: CancellationToken) -> Effect<(), Never, ()> {
    effect! {
        for chunk in large_dataset.chunks(1000) {
            let cancelled = bind* check_interrupt(&token);
            if cancelled {
                break;
            }
            process_chunk(chunk);
        }
    }
}

Use token.cancelled() when an effect should wait until cancellation happens.

let wait_for_shutdown = token.cancelled(); // Effect<(), Never, ()>

Interrupting a FiberHandle

let handle = run_fork(&runtime, || (background_work(), ()));

handle.interrupt();

let result = handle.join().await;
assert!(matches!(result, Err(Cause::Interrupt(_))));

interrupt() completes the handle with Cause::Interrupt(id) if it was still pending. It returns false if the handle had already completed.

Graceful Shutdown

The basic shutdown pattern is:

  1. Signal shared cancellation tokens.
  2. Interrupt top-level handles that should stop.
  3. Await handles with whatever timeout policy your runtime uses.
token.cancel();

for handle in &handles {
    handle.interrupt();
}

for handle in handles {
    let _ = handle.join().await;
}

Not Present Yet

The current API does not include method-style with_cancellation or an uninterruptible helper. Keep cancellation explicit by passing CancellationToken to long-running effects and checking it at safe points.

FiberRef — Fiber-Local State

FiberRef<A> stores values keyed by (FiberRef id, FiberId). It is useful for trace ids, request context, and other fiber-local data.

Creating a FiberRef

use effectful::{FiberRef, run_blocking};

let trace_id: FiberRef<String> = run_blocking(
    FiberRef::make(|| "none".to_string()),
    (),
)?;

FiberRef::make returns an effect because allocation is part of the effect runtime model.

Reading and Writing

let program = effect! {
    bind* trace_id.set("req-abc-123".to_string());

    let id = bind* trace_id.get();
    bind* log(&format!("[{id}] processing request"));

    bind* process_request()
};

Available operations include get, set, update, modify, reset, locally, and locally_with.

Fork and Join Hooks

When you manage logical child fibers yourself, use on_fork and on_join to seed and merge fiber-local values.

let parent = FiberId::ROOT;
let child = FiberId::fresh();

run_blocking(trace_id.on_fork(parent, child), ())?;

with_fiber_id(child, || {
    run_blocking(trace_id.set("child-trace".to_string()), ())
})?;

run_blocking(trace_id.on_join(parent, child), ())?;

The default fork behavior clones the parent value into the child. The default join behavior keeps the child value.

Local Overrides

locally(value, effect) overrides the current fiber’s value while the inner effect runs, then restores the previous value.

let inner = trace_id.locally(
    "override".to_string(),
    trace_id.get(),
);

let value = run_blocking(inner, ())?;
assert_eq!(value, "override");

Current Limitations

Fiber identity is stored in a thread-local cell. This matches single-threaded run_blocking and current-thread Tokio runtimes. Multi-threaded task migration is not tracked yet.

Resources & Scopes — Deterministic Cleanup

RAII works beautifully in synchronous Rust: resources are released when they fall out of scope, Drop runs deterministically. In async code, the picture gets complicated.

This chapter shows why RAII breaks down in async contexts, introduces Scope and finalizers as the solution, covers the acquire_release pattern for RAII-style resource management, and concludes with Pool for reusing expensive connections.

The Resource Problem — Cleanup in Async

RAII in synchronous code:

{
    let file = File::open("data.txt")?;
    process(&file)?;
}  // file.drop() runs here, always, unconditionally

Reliable. Simple. The drop happens when the scope ends — no exceptions (unless you have exceptions).

The Async Complication

async fn process_data() -> Result<(), Error> {
    let conn = open_connection().await?;
    let data = fetch_data(&conn).await?;  // What if this is cancelled?
    transform_and_save(data).await?;      // Or this?
    conn.close().await?;                  // May never reach here
    Ok(())
}

Three problems:

  1. Cancellation: If this async function is cancelled mid-execution, conn.close() never runs.
  2. Panic: If transform_and_save panics, the async task is dropped. conn.close() is skipped.
  3. Async Drop: impl Drop for Connection can only do synchronous cleanup. If closing a connection requires .await, you can’t do it in Drop.

conn.close() must be an async call, but Drop can’t be async. This is a fundamental mismatch.

The Root Cause

RAII relies on Drop running synchronously when a value goes out of scope. In async code, “going out of scope” and “running cleanup” can be decoupled — by cancellation, by executor scheduling, or by the fact that async closures are state machines that might never reach certain states.

The Solution Preview

effectful solves this with:

  • Scope — a region where finalizers are registered and guaranteed to run (even on cancellation or panic)
  • acquire_release — a combinator that pairs acquisition with its cleanup
  • Pool — for long-lived resources that need controlled reuse

All three run cleanup effects (not just synchronous Drop), and all three run them unconditionally — success, failure, or interruption.

Scopes and Finalizers — Guaranteed Cleanup

Scope is a finalizer registry. Finalizers are plain boxed closures that receive an Exit<(), Never> and return Effect<(), Never, ()>.

Creating a Scope

use effectful::{Effect, Exit, Never, Scope, scope_with};

let result = scope_with(|scope| {
    effect! {
        let conn = bind* open_connection();

        let conn_for_close = conn.clone();
        let added = scope.add_finalizer(Box::new(move |_exit: Exit<(), Never>| {
            conn_for_close.close()
        }));

        if !added {
            return Err(AppError::ScopeClosed);
        }

        let data = bind* fetch_data(&conn);
        process(data)
    }
});

scope_with creates a fresh scope, runs the returned effect, then closes the scope. Closing runs registered finalizers.

Finalizers Always Run on Close

let scope = Scope::make();
let added = scope.add_finalizer(Box::new(|_exit| cleanup_temp_file()));
assert!(added);

scope.close();

Finalizers run when close / close_with_exit is called. close is idempotent and returns true only for the first close.

Multiple Finalizers

Finalizers run in reverse registration order.

scope.add_finalizer(Box::new(|_| close_connection(conn)));
scope.add_finalizer(Box::new(|_| rollback_transaction(txn)));
scope.add_finalizer(Box::new(|_| close_cursor(cursor)));

scope.close();
// Runs: close_cursor, rollback_transaction, close_connection

Register parent resources first and child resources last.

Scope Inheritance

Scopes can be nested manually.

let outer = Scope::make();
let inner = outer.fork();

inner.add_finalizer(Box::new(|_| cleanup_inner()));
outer.add_finalizer(Box::new(|_| cleanup_outer()));

outer.close(); // closes children before outer finalizers

Use Scope::fork for child scopes and Scope::extend to reparent an existing open scope.

acquire_release — Acquire Then Release

acquire_release(acquire, release) runs an acquire effect, then runs the release effect, then returns the acquired value.

use effectful::acquire_release;

let effect = acquire_release(
    open_connection(),
    |conn| conn.close(),
);

Current behavior is simple and immediate:

  1. Run open_connection().
  2. If acquisition succeeds, clone the acquired value.
  3. Run conn.close() in a default release environment.
  4. Return the cloned acquired value.

This is not a scoped bracket that keeps the resource open for a user block. For block-scoped resource use, use scope_with and Scope::add_finalizer, or a Pool checkout that returns resources when the caller’s scope closes.

When to Use It

Use acquire_release for acquire/release pairs where returning the acquired value after release is still meaningful, or as a low-level primitive while building stronger resource helpers.

let effect = effect! {
    bind* acquire_release(load_temp_value(), |value| cleanup_temp_value(value))
};

Scoped Resource Pattern

For resources that must remain open while work runs, register a finalizer in a scope.

let program = scope_with(|scope| {
    effect! {
        let conn = bind* open_connection();
        let conn_for_close = conn.clone();
        scope.add_finalizer(Box::new(move |_| conn_for_close.close()));

        bind* run_query(&conn, "SELECT 1")
    }
});

That pattern keeps acquisition, use, and cleanup in the same visible block.

Pools — Reusing Expensive Resources

Pool<A, E> and KeyedPool<K, A, E> manage reusable values with a capacity gate. A checkout is tied to a Scope: when the scope closes, the value is returned to the idle list unless invalidated.

Pool

use effectful::Pool;

let pool_effect = Pool::make(10, || open_connection("postgres://localhost/app"));
let pool: Pool<Connection, DbError> = run_blocking(pool_effect, ())?;

Pool::make(capacity, factory) returns Effect<Pool<A, E>, Never, ()>. The factory is an effect that creates a fresh value when no reusable idle value is available.

Checking Out

use effectful::{Scope, run_blocking};

let scope = Scope::make();
let conn = run_blocking(pool.get(), scope.clone())?;

// Use conn while scope is open.

scope.close(); // returns conn to the pool's idle list

pool.get() returns Effect<A, E, Scope>. It acquires capacity, reuses an idle value or runs the factory, and registers a finalizer in the provided scope.

Invalidating Values

run_blocking(pool.invalidate(conn.clone()), ())?;

Invalidated values are not reused when their checkout scope closes.

KeyedPool

Use KeyedPool when resource creation depends on a key.

use effectful::KeyedPool;

let pools_effect = KeyedPool::make(20, |key: DbRole| open_connection_for(key));
let pools: KeyedPool<DbRole, Connection, DbError> = run_blocking(pools_effect, ())?;

let scope = Scope::make();
let primary = run_blocking(pools.get(DbRole::Primary), scope.clone())?;
let replica = run_blocking(pools.get(DbRole::Replica), scope.clone())?;

scope.close();

Capacity is global across all keys. Idle values are stored per key.

TTL

Both pool types have make_with_ttl. Idle values older than the TTL are discarded on checkout.

let pool = run_blocking(
    Pool::make_with_ttl(10, Duration::from_secs(300), || open_connection(url.clone())),
    (),
)?;

Pool as a Service

In applications, build the pool at startup and provide it as a service or context value. Business code should depend on the pool abstraction and checkout inside an explicit Scope.

Scheduling — Retry, Repeat, and Time

Production services fail. Networks are unreliable. Downstream APIs go down. The database gets overwhelmed. Defensive engineering means anticipating failure and building policies for what to do when it happens.

effectful models these policies with Schedule — a type that describes when to retry, how long to wait between attempts, and when to give up. Combined with Clock injection, scheduling logic becomes testable without real-time delays.

Schedule — The Retry/Repeat Policy Type

A Schedule is a policy used by the free retry and repeat functions. It receives ScheduleInput { attempt } and either returns a ScheduleDecision { delay } or stops.

The Core Concept

use effectful::{Schedule, ScheduleInput};

let mut schedule = Schedule::recurs(3);

while let Some(decision) = schedule.next(ScheduleInput { attempt }) {
    sleep(decision.delay);
    attempt += 1;
}

The current schedule model tracks the attempt number. It does not inspect elapsed time, the last success value, or the last error.

Creating Schedules

use std::time::Duration;
use effectful::{Schedule, ScheduleInput};

let max_three = Schedule::recurs(3);
let fixed = Schedule::spaced(Duration::from_secs(1));
let exponential = Schedule::exponential(Duration::from_millis(100));
let until_attempt_ten = Schedule::recurs_until(Box::new(|input: &ScheduleInput| input.attempt >= 10));
let while_first_ten = Schedule::recurs_while(Box::new(|input: &ScheduleInput| input.attempt < 10));

Combining Schedules

Use compose to require both schedules to continue. The produced delay is the maximum of the two decisions.

use std::time::Duration;
use effectful::Schedule;

let retry_policy = Schedule::exponential(Duration::from_millis(100))
    .compose(Schedule::recurs(5));

Use jittered to apply deterministic jitter to delays.

let jittered = Schedule::spaced(Duration::from_secs(1)).jittered();

Schedule as a Value

Schedules are values you pass to retry or repeat. Effects are one-shot, so these functions take factories.

use std::time::Duration;
use effectful::{Effect, Schedule, retry};

fn call_external_api() -> Effect<Response, ApiError, HttpClient> {
    retry(
        || make_request(),
        Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(5)),
    )
}

Built-in Schedules

effectful’s current Schedule API is intentionally small. It covers fixed delays, exponential backoff, attempt limits, predicates, composition, and deterministic jitter.

recurs

Schedule::recurs(5)

Allows five additional schedule steps with zero delay. Compose it with a delay schedule to cap retries or repeats.

spaced

Schedule::spaced(Duration::from_secs(5))

Produces the same delay for every step. Use it for polling, heartbeats, and fixed-interval repeat loops.

exponential

Schedule::exponential(Duration::from_millis(100))
// Delays: 100ms, 200ms, 400ms, 800ms, ...

The standard retry backoff. Compose with Schedule::recurs(n) to cap attempts.

let bounded = Schedule::exponential(Duration::from_millis(100))
    .compose(Schedule::recurs(5));

recurs_while / recurs_until

use effectful::{Schedule, ScheduleInput};

let first_ten = Schedule::recurs_while(Box::new(|input: &ScheduleInput| input.attempt < 10));
let until_ten = Schedule::recurs_until(Box::new(|input: &ScheduleInput| input.attempt >= 10));

These schedules decide from the attempt counter.

compose

let policy = Schedule::spaced(Duration::from_secs(1))
    .compose(Schedule::recurs(10));

compose continues only while both schedules continue. When both produce a delay, the larger delay wins.

jittered

let policy = Schedule::exponential(Duration::from_millis(100))
    .jittered()
    .compose(Schedule::recurs(5));

jittered currently applies deterministic jitter. It is useful for exercising jitter paths without adding random behavior to tests.

Not Present Yet

The current API does not include Fibonacci schedules, max-delay caps, total-duration caps, method-style .retry(), or method-style .repeat(). Use the free retry / repeat functions with factories.

retry and repeat — Applying Policies

retry and repeat apply a Schedule to effect factories. They are free functions because Effect values are one-shot.

retry

use std::time::Duration;
use effectful::{Schedule, retry};

let result = retry(
    || flaky_api_call(),
    Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);

retry runs the effect returned by the factory. If it fails and the schedule continues, it waits and calls the factory again. It returns the first success, or the last error when the schedule stops.

repeat

use std::time::Duration;
use effectful::{Schedule, repeat};

let polling = repeat(
    || check_job_status(),
    Schedule::spaced(Duration::from_secs(5)).compose(Schedule::recurs(12)),
);

repeat runs once, then continues while the schedule continues. It returns the last success value.

Use cases:

  • Poll for job completion every few seconds
  • Send a bounded number of heartbeats
  • Refresh a cache on a fixed interval

Explicit Clock Variants

Use retry_with_clock and repeat_with_clock when tests need a deterministic clock.

use effectful::{Schedule, TestClock, retry_with_clock};
use std::time::{Duration, Instant};

let clock = TestClock::new(Instant::now());
let effect = retry_with_clock(
    || flaky_api_call(),
    Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
    clock,
    None,
);

The _and_interrupt variants also accept a CancellationToken.

Conditional Retry

The current API does not include retry_while or an error predicate parameter. retry retries every failure until the schedule stops. If you need error-sensitive retry, write a small custom loop around Effect::run or add that policy at the call site before using retry.

Composition

retry and repeat return ordinary effects.

let batch = retry(
    || process_single_item(item.clone()),
    Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);

let continuous = repeat(
    move || batch_factory(),
    Schedule::spaced(Duration::from_secs(60)).compose(Schedule::recurs(10)),
);

Clock Injection — Testable Time

retry and repeat use a live clock by default. For deterministic tests, use the explicit-clock variants: retry_with_clock, repeat_with_clock, and their interruption-aware forms.

The Clock Trait

use std::time::{Duration, Instant};
use effectful::{Effect, Never};

trait Clock {
    fn now(&self) -> Instant;
    fn sleep(&self, duration: Duration) -> Effect<(), Never, ()>;
    fn sleep_until(&self, deadline: Instant) -> Effect<(), Never, ()>;
}

Clock is monotonic-time oriented. Calendar time for logging is exposed separately through LiveClock::now_utc().

Production: LiveClock

use effectful::{LiveClock, ThreadSleepRuntime};

let live_clock = LiveClock::new(ThreadSleepRuntime);

LiveClock delegates sleeping and now() to a runtime.

Testing: TestClock

use std::time::{Duration, Instant};
use effectful::{Clock, TestClock};

let start = Instant::now();
let clock = TestClock::new(start);

assert_eq!(clock.now(), start);

clock.advance(Duration::from_secs(60));
assert_eq!(clock.now(), start + Duration::from_secs(60));

TestClock records pending sleeps. Advancing or setting time drops pending sleeps whose deadlines have elapsed.

Test Example

use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::time::{Duration, Instant};
use effectful::{Schedule, TestClock, retry_with_clock, run_blocking};

let clock = TestClock::new(Instant::now());
let attempts = Arc::new(AtomicU32::new(0));

let effect = retry_with_clock(
    {
        let attempts = attempts.clone();
        move || failing_operation(attempts.clone())
    },
    Schedule::exponential(Duration::from_secs(1)).compose(Schedule::recurs(3)),
    clock.clone(),
    None,
);

let result = run_blocking(effect, ());
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::Relaxed), 4); // initial + 3 retries

This test runs without sleeping in real time because TestClock::sleep only records deadlines.

Clock as a Service

For application logic that needs time directly, model the clock as a service in your environment. The scheduling helpers accept a clock value explicitly; your own services can do the same.

Software Transactional Memory — Optimistic Concurrency

Shared mutable state is hard. Mutexes work but compose poorly: lock two mutexes in the wrong order and you deadlock. Lock them separately and you get torn reads. Lock the whole world and you serialise unnecessarily.

Software Transactional Memory (STM) takes a different approach: every operation on shared state runs inside a transaction. Transactions commit atomically or roll back and retry. No explicit locks. No deadlocks. No torn reads.

This chapter covers effectful’s STM implementation: Stm, TRef, commit, and the transactional collection types.

Why STM? — The Shared State Problem

Consider transferring money between two accounts. With mutexes:

fn transfer(from: &Mutex<Account>, to: &Mutex<Account>, amount: u64) {
    let from_guard = from.lock().unwrap();
    // Thread B might be doing transfer(to, from, ...) right here
    let to_guard = to.lock().unwrap();
    // DEADLOCK: Thread A holds from, waiting for to.
    //           Thread B holds to, waiting for from.
    from_guard.balance -= amount;
    to_guard.balance += amount;
}

The standard fix (always lock in a consistent order) requires global coordination across your codebase. Add a third account and you need to sort three locks. It doesn’t compose.

STM: Optimistic Concurrency

STM operates on the assumption that conflicts are rare. Instead of locking, it:

  1. Reads current values into a local transaction log
  2. Computes new values based on those reads
  3. Attempts to commit: checks that nothing changed since the reads, then atomically writes

If anything changed between step 1 and step 3, the transaction retries automatically from step 1.

use effectful::{Effect, Stm, TRef, commit};

fn transfer(from: TRef<Account>, to: TRef<Account>, amount: u64) -> Effect<(), TransferError, ()> {
    let transaction: Stm<(), TransferError> = from.read_stm().flat_map(move |from_acct| {
        to.read_stm().flat_map(move |to_acct| {
        if from_acct.balance < amount {
            Stm::fail(TransferError::InsufficientFunds)
        } else {
            from.write_stm(Account { balance: from_acct.balance - amount, ..from_acct })
                .flat_map(move |_| to.write_stm(Account { balance: to_acct.balance + amount, ..to_acct }))
        }
        })
    });

    commit(transaction)
}

No locks. No deadlock risk. The transaction retries automatically if another transaction modified either account between our read and our write.

When STM Wins

SituationMutexSTM
Single shared value✓ simple✓ fine
Multiple related values✗ deadlock risk✓ composable
Read-heavy workloads✗ blocks writers✓ reads never block
Composing two existing operations✗ requires coordination✓ compose Stm values
Long operations with I/O✓ (STM would retry too much)✗ wrong tool

STM shines when:

  • You need to update multiple values atomically
  • You’re composing smaller transactional operations into larger ones
  • Contention is low (retries are cheap)

Avoid STM for long-running operations that do I/O. Transactions should be short and pure; use Effect for I/O and Stm for transactional state changes.

TRef — Transactional References

TRef<T> is the fundamental mutable cell in effectful’s STM system. A TRef is read and written through Stm operations, then committed atomically with commit / atomically.

Creating a TRef

use effectful::{TRef, commit, run_blocking};

let counter: TRef<i32> = run_blocking(commit(TRef::make(0)), ())?;
let balance: TRef<f64> = run_blocking(commit(TRef::make(1000.0)), ())?;

TRef::make(value) returns Stm<TRef<T>, ()>, not a TRef directly. That keeps allocation inside the same transactional model as reads and writes.

Transactional Operations

All operations return Stm<_, E> descriptions. Nothing changes until the transaction is committed.

use effectful::{Stm, TRef};

let counter: TRef<i32> = /* built with TRef::make */;

let read_op: Stm<i32, ()> = counter.read_stm();
let write_op: Stm<(), ()> = counter.write_stm(42);
let update_op: Stm<(), ()> = counter.update_stm(|n| n + 1);
let modify_op: Stm<i32, ()> = counter.modify_stm(|n| (n, n + 1));

Use update_stm when you only need to write a new value. Use modify_stm when the transaction should return one value and store another.

Composing Transactions

There is currently no stm! macro. Compose Stm values with flat_map and map.

use effectful::{Stm, TRef};

let counter: TRef<i32> = /* ... */;
let total: TRef<i32> = /* ... */;

let transaction: Stm<(), ()> = counter.read_stm().flat_map(move |count| {
    let total = total.clone();
    counter
        .write_stm(count + 1)
        .flat_map(move |_| total.update_stm(move |sum| sum + count))
});

Sharing TRefs

TRef is cloneable and internally shared. Wrap it in Arc only when your surrounding ownership model needs an Arc.

use std::sync::Arc;
use effectful::TRef;

let shared: Arc<TRef<i32>> = Arc::new(run_blocking(commit(TRef::make(0)), ())?);

TRef vs. Mutex<T>

PropertyTRefMutex
Composable across multiple cellsYesManual lock ordering
Commit is atomicYesOnly while locks are held
Transaction body can do I/ONoTechnically yes, but risky
Retry on conflictYesNo

Use TRef for short, composable state mutations. Use Mutex for non-transactional shared state, especially when you cannot model the operation as a short pure transaction.

Stm and commit — Building Transactions

Stm<A, E> describes a transactional computation. It is lazy and pure with respect to the outside world: it can read/write transactional cells, fail with E, or retry.

commit: Lift Stm into Effect

use effectful::{Effect, Stm, TRef, commit, run_blocking};

let ref_a: TRef<i32> = run_blocking(commit(TRef::make(1)), ())?;
let ref_b: TRef<i32> = run_blocking(commit(TRef::make(2)), ())?;

let transaction: Stm<i32, ()> = ref_a.read_stm().flat_map(move |a| {
    ref_b.read_stm().map(move |b| a + b)
});

let effect: Effect<i32, (), ()> = commit(transaction);
let result = run_blocking(effect, ())?;

commit(stm) returns Effect<A, E, R>. The transaction is executed when the effect runs. On commit conflicts or Stm::retry(), it retries.

atomically

atomically(stm) is an alias for commit(stm). It still returns an Effect; run it with run_blocking, run_async, or compose it with other effects.

use effectful::{atomically, run_blocking};

let effect = atomically(counter.modify_stm(|n| (n + 1, n + 1)));
let value = run_blocking(effect, ())?;

Stm::fail

Transactions can fail with typed errors:

use effectful::{Stm, TRef};

fn withdraw(account: TRef<u64>, amount: u64) -> Stm<u64, InsufficientFunds> {
    account.read_stm().flat_map(move |balance| {
        if balance < amount {
            Stm::fail(InsufficientFunds)
        } else {
            account
                .write_stm(balance - amount)
                .map(move |_| balance - amount)
        }
    })
}

Stm::fail(e) aborts the current transaction immediately. commit propagates that E through the returned effect.

Stm::retry

Sometimes a transaction should wait until a condition is true rather than fail:

use effectful::{Stm, TRef};

fn dequeue(queue: TRef<Vec<Item>>) -> Stm<Item, ()> {
    queue.read_stm().flat_map(move |items| {
        if items.is_empty() {
            Stm::retry()
        } else {
            let item = items[0].clone();
            queue.write_stm(items[1..].to_vec()).map(move |_| item)
        }
    })
}

Stm::retry() asks the commit loop to restart later. TQueue::take uses this behavior to wait while empty.

Composing Transactions

Transactions compose with flat_map and map.

let big_transaction: Stm<(), AppError> = transfer_funds(from, to, amount)
    .flat_map(move |_| record_audit_log(from, to, amount));

let effect = commit(big_transaction);

The composed transaction commits as a unit. If any part fails, retries, or observes a conflict, the whole transaction does not partially commit.

TQueue, TMap, TSemaphore — Transactional Collections

effectful provides STM-aware collection types. Their constructors and operations return Stm values, so they compose with TRef reads/writes and commit atomically.

TQueue

use effectful::{Stm, TQueue};

let queue_stm: Stm<TQueue<Job>, ()> = TQueue::bounded(100);
let unbounded_stm: Stm<TQueue<Job>, ()> = TQueue::unbounded();

let offer: Stm<bool, ()> = queue.offer(job); // false when bounded and full
let take: Stm<Job, ()> = queue.take();       // retries while empty

TQueue::offer does not block when a bounded queue is full; it returns false. TQueue::take retries when the queue is empty.

Producer-Consumer Pattern

fn producer(queue: TQueue<Job>, jobs: Vec<Job>) -> Effect<(), (), ()> {
    effect! {
        for job in jobs {
            let accepted = bind* commit(queue.offer(job));
            if !accepted {
                return Err(());
            }
        }
    }
}

fn consumer(queue: TQueue<Job>) -> Effect<(), JobError, ()> {
    effect! {
        loop {
            let job = bind* commit(queue.take());
            bind* process_job(job);
        }
    }
}

TMap

use effectful::{Stm, TMap};

let map_stm: Stm<TMap<String, User>, ()> = TMap::make();

let get: Stm<Option<User>, ()> = map.get(&"alice".to_string());
let set: Stm<(), ()> = map.set("alice".to_string(), alice_user);
let delete: Stm<(), ()> = map.delete(&"alice".to_string());

TMap is a transactional hash map. Reading from a TMap and updating a TRef in the same transaction is atomic.

let transaction = user_map.get(&"alice".to_string()).flat_map(move |user| {
    access_counter
        .update_stm(|count| count + 1)
        .map(move |_| user)
});

let effect = commit(transaction);

TSemaphore

use effectful::{Stm, TSemaphore};

let sem_stm: Stm<TSemaphore, ()> = TSemaphore::make(10);

let acquire: Stm<(), ()> = sem.acquire(); // retries while zero
let release: Stm<(), ()> = sem.release();

TSemaphore is a transactional permit counter. acquire decrements by one or retries when no permits are available; release increments by one.

let guarded = sem.acquire().flat_map(move |_| {
    update_shared_state().flat_map(move |result| {
        sem.release().map(move |_| result)
    })
});

let effect = commit(guarded);

Summary

TypePurpose
TRef<T>Single transactional cell
TQueue<T>Transactional FIFO queue
TMap<K, V>Transactional hash map
TSemaphoreTransactional permit counter

All compose as Stm values and commit atomically with other STM operations.

Streams — Backpressure and Chunked Processing

An Effect produces one value. A Stream produces many values over time. When you need to process a potentially infinite or very large sequence — database result sets, event logs, file lines, sensor readings — Stream is the right abstraction.

This chapter covers when to use Stream vs Effect, how streams process data in Chunks for efficiency, how to control flow with backpressure policies, and how to consume streams with Sink.

Stream vs Effect — When to Use Each

The choice is about cardinality.

Effect<A, E, R>  -> produces exactly one A or fails
Stream<A, E, R>  -> produces zero or more A values or fails

Concrete Examples

fn get_user(id: u64) -> Effect<User, DbError, Db>
fn all_users() -> Stream<User, DbError, Db>

fn count_orders() -> Effect<u64, DbError, Db>
fn export_orders() -> Stream<Order, DbError, Db>

If you fetch 10 million rows into a Vec and return it as an Effect, you can run out of memory. A Stream lets consumers process chunks incrementally.

Stream Transformations

all_users()
    .filter(Box::new(|u: &User| u.is_active()))
    .map(UserSummary::from)
    .take(100)

Current element operators include map, filter, take, take_while, drop_while, map_effect, and map_par_n.

Collecting a Stream into an Effect

When all results fit in memory, use run_collect.

let users: Effect<Vec<User>, DbError, Db> = all_users().run_collect();

For large results, prefer a fold or a sink.

let count: Effect<usize, DbError, Db> = all_users().run_fold(0, |acc, _| acc + 1);

Converting Effect to Stream

Stream::from_effect expects an effect that produces a Vec<A>.

use effectful::Stream;

let one_user: Effect<Vec<User>, DbError, Db> = get_user(1).map(|user| vec![user]);
let stream: Stream<User, DbError, Db> = Stream::from_effect(one_user);

For pure finite streams, use Stream::from_iterable.

let numbers = Stream::from_iterable([1, 2, 3]);

The Rule

NeedUse
One resultEffect
Many resultsStream
All stream results in memorystream.run_collect()
Aggregated resultstream.run_fold(init, f) or Sink::fold_left
Custom consumersink.run(stream)

Chunks — Batched Stream Data

Chunk<A> is the batch container used by stream internals. It wraps a Vec<A> and makes chunk-level stream semantics explicit.

What Is a Chunk

use effectful::Chunk;

let chunk = Chunk::from_vec(vec![1, 2, 3, 4, 5]);

assert_eq!(chunk.len(), 5);
assert!(!chunk.is_empty());

for item in chunk.iter() {
    println!("{item}");
}

Why Chunks Exist

Streams pull values in batches. Operators can transform a whole Chunk internally instead of paying per-element overhead at every boundary.

Single-element model:
  elem1 -> map -> filter -> emit -> elem2 -> map -> filter -> emit -> ...

Chunk model:
  chunk[1..64] -> map chunk -> filter chunk -> emit chunk -> ...

Working with Chunks

let doubled = chunk.map(|x| x * 2);
let values = doubled.into_vec();

The current public Chunk API is intentionally small:

MethodPurpose
Chunk::empty()Empty chunk
Chunk::singleton(value)One-element chunk
Chunk::from_vec(values)Build from a vector
.len() / .is_empty()Inspect size
.iter()Iterate by reference
.into_vec()Consume into Vec<A>
.map(f)Transform elements
.sort_with(order) / .compare_by(other, order)Runtime ordering helpers

In Streams and Sinks

Most application code does not construct chunks directly. Stream::poll_next_chunk and sink drivers use chunks internally; user-facing APIs usually expose element-wise transformations or final Effect consumers.

Backpressure Policies — Controlling Flow

Channel-backed streams use BackpressurePolicy to decide what happens when the internal queue is full.

The Problem

Producer: emits 10,000 events/sec
Consumer: processes 1,000 events/sec

What happens to the 9,000 surplus events per second?

The answer is domain-specific: block, drop, or fail.

BackpressurePolicy

use effectful::BackpressurePolicy;

BackpressurePolicy::BoundedBlock // wait until space is available
BackpressurePolicy::DropNewest   // discard the newly offered item
BackpressurePolicy::DropOldest   // evict the oldest queued item
BackpressurePolicy::Fail         // fail the producer enqueue effect

backpressure_decision(policy, queue_len, capacity) exposes the decision logic directly for tests or diagnostics.

Channel-Backed Streams

use effectful::{BackpressurePolicy, Chunk, stream_from_channel_with_policy, send_chunk, end_stream};

let (stream, sender) = stream_from_channel_with_policy::<Event, AppError, ()>(
    1024,
    BackpressurePolicy::DropOldest,
);

run_blocking(send_chunk(&sender, Chunk::singleton(event)), ())?;
run_blocking(end_stream(sender), ())?;

Use stream_from_channel(capacity) for the default BoundedBlock policy.

Choosing a Policy

ScenarioPolicy
No data loss acceptableBoundedBlock
Latest value matters mostDropOldest
New overload data is expendableDropNewest
Caller must know about overflowFail

Monitoring Drops

There is no built-in dropped-counter helper. If drops matter operationally, wrap send_chunk or your producer logic and record metrics at that boundary.

Summary

Choose a policy explicitly. BoundedBlock is safest for correctness but can stall producers. DropOldest and DropNewest trade completeness for bounded memory. Fail surfaces overload as an error.

Sinks — Consuming Streams

A Sink<Out, In, E, R> reduces a Stream<In, E, R> into an Out. It is a struct with a driver, not a trait you implement directly.

Built-in Sinks

use effectful::Sink;

let collect = Sink::<Vec<User>, User, DbError, Db>::collect();
let total = Sink::fold_left(0u64, |acc, order: Order| acc + order.amount);
let drain = Sink::<(), Event, EventError, EventEnv>::drain();

Run a sink with sink.run(stream).

let users: Effect<Vec<User>, DbError, Db> = Sink::collect().run(all_users());
let total: Effect<u64, DbError, Db> = total.run(orders());

Stream Consumer Methods

For common cases, stream methods are often simpler.

let users: Effect<Vec<User>, DbError, Db> = all_users().run_collect();

let total: Effect<u64, DbError, Db> = orders()
    .run_fold(0u64, |acc, order| acc + order.amount);

let logged: Effect<(), EventError, EventEnv> = events()
    .run_for_each_effect(|event| log_event(event));

Sink Composition

Sink::zip combines two fold-based sinks into one pass.

let count = Sink::fold_left(0usize, |n, _order: Order| n + 1);
let total = Sink::fold_left(0u64, |sum, order: Order| sum + order.amount);

let both = count.zip(total);
let effect: Effect<(usize, u64), DbError, Db> = both.run(orders());

zip panics if either sink was not created with fold_left / from_fold.

Other Built-ins

SinkPurpose
Sink::collect()Collect elements into Vec<In>
Sink::collect_all_while(pred)Collect until predicate first fails
Sink::collect_all_until(pred)Collect until predicate first succeeds
Sink::fold_left(init, f)Left fold
Sink::drain()Discard all elements
Sink::to_queue(queue)Offer each element to a queue
Sink::collect_to_map()Collect (K, V) pairs into EffectHashMap

Custom Sinks

The current API does not expose a public trait for custom chunk callbacks. Build custom consumers with Stream::run_fold, run_fold_effect, run_for_each_effect, or compose existing Sink constructors.

Schema — Parse, Don’t Validate

Data enters your program from the outside world: HTTP request bodies, database rows, configuration files, message queue payloads. All of it is untrusted. All of it needs to be checked.

The naive approach is to deserialise first and validate later — accept a User struct via serde, then check that email is non-empty and age is positive in a separate step. The problem: your type says User but your program has a User that might have an empty email. The type lies.

The better approach is parse, don’t validate: transform untrusted input into trusted types in one step. If the parse succeeds, you have a valid User. If it fails, you have a structured ParseError that tells you exactly what was wrong.

effectful’s schema module is built on this principle.

What This Chapter Covers

  • Unknown — the type for unvalidated wire data (next section)
  • Schema combinators — the building blocks for describing data shapes (ch14-02)
  • Validation and refinementrefine, filter, and Brand for domain constraints (ch14-03)
  • ParseErrors — structured, accumulating error reports (ch14-04)

The Unknown Type — Unvalidated Wire Data

Unknown is effectful’s dynamic input tree for schema decoding. It represents JSON-like data without trusting its shape.

Creating Unknown Values

use std::collections::BTreeMap;
use effectful::schema::Unknown;

let s = Unknown::String("hello".to_string());
let n = Unknown::I64(42);
let b = Unknown::Bool(true);
let null = Unknown::Null;
let arr = Unknown::Array(vec![Unknown::I64(1), Unknown::I64(2)]);

let mut fields = BTreeMap::new();
fields.insert("name".to_string(), Unknown::String("Alice".to_string()));
fields.insert("age".to_string(), Unknown::I64(30));
let obj = Unknown::Object(fields);

With the schema-serde feature, convert from serde_json::Value using unknown_from_serde_json.

use effectful::schema::unknown_from_serde_json;

let value: serde_json::Value = serde_json::from_str(input)?;
let unknown = unknown_from_serde_json(value);

Why Not serde_json::Value Directly?

serde_json::Value is useful at the edge, but schemas need a stable internal representation with effectful’s parse errors and combinators. Unknown gives schema decoders one input model independent of where the data came from.

Inspecting Unknown Values

Most code should not inspect Unknown directly. Decode it with a Schema. For debugging or custom decoders, match on the enum variants.

match &unknown {
    Unknown::Object(fields) => { /* inspect fields */ }
    Unknown::Array(items) => { /* inspect items */ }
    Unknown::String(value) => { /* inspect string */ }
    Unknown::I64(value) => { /* inspect integer */ }
    Unknown::F64(value) => { /* inspect float */ }
    Unknown::Bool(value) => { /* inspect bool */ }
    Unknown::Null => { /* inspect null */ }
}

The Parse Boundary

Use Unknown at I/O boundaries, then decode once into trusted domain types.

use effectful::schema::{Unknown, string};

let raw = Unknown::String("alice@example.com".to_string());
let email = string::<()>().decode_unknown(&raw)?;

Nothing beyond the parse boundary should accept Unknown; domain functions should accept validated types.

Schema Combinators — Describing Data Shapes

A schema describes how to decode wire data into a typed value and encode it back. In the current API the full shape is Schema<A, I, E>: semantic value A, wire/intermediate value I, and schema marker E.

Primitive Schemas

use effectful::schema::{bool_, f64, i64, string};

let name = string::<()>(); // Schema<String, String, ()>
let age = i64::<()>();     // Schema<i64, i64, ()>
let price = f64::<()>();   // Schema<f64, f64, ()>
let active = bool_::<()>(); // Schema<bool, bool, ()>

Each primitive can decode its typed wire value with decode, and can decode an Unknown with decode_unknown.

Struct-Like Schemas

struct_, struct3, and struct4 decode named object fields into tuples. Use transform to map the tuple into a domain struct.

use effectful::schema::{ParseError, Schema, i64, string, struct_, transform};

#[derive(Clone)]
struct User {
    name: String,
    age: i64,
}

let tuple_schema = struct_("name", string::<()>(), "age", i64::<()>());

let user_schema = transform(
    tuple_schema,
    |(name, age)| Ok(User { name, age }),
    |user: User| (user.name, user.age),
);

If a field is missing or has the wrong type, decode_unknown returns a ParseError with the field path.

Optional and Array Schemas

use effectful::schema::{array, optional, string};

let maybe_name = optional(string::<()>());
let tags = array(string::<()>());

optional(schema) accepts Unknown::Null as None. array(schema) decodes each element and prefixes parse errors with the failing index.

Validation and Transformation

Use free combinators, not schema methods.

use effectful::schema::{ParseError, filter, string, transform};

let non_empty = filter(string::<()>(), |s| !s.is_empty(), "must not be empty");

let email_schema = transform(
    non_empty,
    |s| Email::parse(s).map_err(|e| ParseError::new("", format!("invalid email: {e}"))),
    |email: Email| email.into_string(),
);

filter keeps values satisfying a predicate. transform performs bidirectional conversion and may fail while decoding.

Unions

union_ tries a primary schema, then a fallback schema. For more than two branches, use union_chain from schema::extra.

use effectful::schema::{Schema, Unknown, union_};

let primary: Schema<UserId, Unknown, ()> = user_id_from_number();
let fallback: Schema<UserId, Unknown, ()> = user_id_from_string();
let user_id_schema = union_(primary, fallback);

Running a Schema

Run schemas directly through their methods.

use effectful::schema::{Unknown, i64};

let raw = Unknown::I64(30);
let age = i64::<()>().decode_unknown(&raw)?;

decode works on the schema’s typed wire value I. decode_unknown works on Unknown trees at I/O boundaries.

Validation and Refinement — Constrained Types

Schemas parse structure. Validation adds constraints: an age must be positive, an email must contain @, a price must have at most two decimal places.

refine / filter

refine and filter attach a predicate to an existing schema. Parsing succeeds only when the base schema succeeds and the predicate returns true.

use effectful::schema::{i64, refine, string, filter};

let age_schema = refine(
    i64::<()>(),
    |n| (0..=150).contains(n),
    "age must be between 0 and 150",
);

let non_empty = filter(
    string::<()>(),
    |s: &String| !s.is_empty(),
    "must not be empty",
);

If the predicate fails, decoding returns ParseError::new("", message).

Fallible Transformation

Use transform when conversion can fail or when the semantic type differs from the wire type.

use effectful::schema::{ParseError, string, transform};

let url_schema = transform(
    string::<()>(),
    |s| url::Url::parse(&s).map_err(|e| ParseError::new("", format!("invalid URL: {e}"))),
    |url: url::Url| url.to_string(),
);

The decode closure returns Result<B, ParseError>. The encode closure maps the semantic value back to the base schema’s semantic type.

Brand

Brand<A, B> is a zero-cost nominal wrapper. Use Brand::nominal when the value was already validated, or RefinedBrand when construction should validate.

use effectful::schema::{Brand, RefinedBrand};

struct EmailMarker;
type Email = Brand<String, EmailMarker>;

let email = Brand::nominal("alice@example.com".to_string());

let make_email = RefinedBrand::<String, EmailMarker>::new(|s| {
    if s.contains('@') {
        Ok(())
    } else {
        Err("invalid email".to_string())
    }
});

let checked: Email = make_email.try_make("alice@example.com".to_string())?;

Now APIs can demand Email instead of a raw String.

fn send_welcome(to: Email) -> Effect<(), MailError, Mailer> { /* ... */ }

HasSchema

HasSchema attaches a canonical schema to a type family. The trait exposes associated types for semantic value, wire value, and schema marker.

use effectful::schema::{HasSchema, Schema, i64};

struct UserIdSchema;

impl HasSchema for UserIdSchema {
    type A = i64;
    type I = i64;
    type E = ();

    fn schema() -> Schema<Self::A, Self::I, Self::E> {
        i64::<()>()
    }
}

Use HasSchema for generic tooling that needs to ask for a canonical schema without knowing how it is built.

Summary

ToolWhen to use
refine / filterPredicate on a parsed value
transformFallible conversion or semantic/wire conversion
Brand::nominalNominal wrapper after validation elsewhere
RefinedBrandValidating branded constructor
HasSchemaAttach a canonical schema to a type-level provider

ParseErrors — Structured Parse Failures

ParseError represents one schema decoding failure. ParseErrors is a small aggregate wrapper used when APIs want to return more than one issue.

ParseError vs ParseErrors

use effectful::schema::{ParseError, ParseErrors};

let e = ParseError::new("age", "age must be positive");
let one = ParseErrors::one(e.clone());
let many = ParseErrors::new(vec![e]);

ParseError has two public fields: path and message.

let err = ParseError::new("users.0.age", "expected i64");
assert_eq!(err.path, "users.0.age");
assert_eq!(err.message, "expected i64");

Path Tracking

Schema combinators prefix paths as they decode nested data.

use effectful::schema::{Unknown, array, i64};

let schema = array(i64::<()>());
let raw = Unknown::Array(vec![Unknown::I64(1), Unknown::String("oops".to_string())]);

let err = schema.decode_unknown(&raw).unwrap_err();
assert_eq!(err.path, "1");

struct_, struct3, and struct4 prefix field names. array prefixes element indexes.

Accumulation Status

The current schema decoders generally short-circuit on the first failure for decode_unknown, but decode_unknown_all accumulates nested field errors for object and tuple schemas (including struct_/struct3/struct4, tuple/tuple3/tuple4), as well as array elements and union arm diagnostics. ParseErrors exists as the aggregate type for boundaries or custom validators that collect multiple ParseError values themselves.

fn validate_user(raw: &Unknown) -> Result<User, ParseErrors> {
    let mut issues = Vec::new();

    if let Err(err) = name_schema().decode_unknown(raw) {
        issues.push(err);
    }
    if let Err(err) = age_schema().decode_unknown(raw) {
        issues.push(err);
    }

    if issues.is_empty() {
        build_user(raw).map_err(ParseErrors::one)
    } else {
        Err(ParseErrors::new(issues))
    }
}

API Boundary Conversion

Convert parse issues into your API error type at the boundary.

#[derive(Debug)]
enum ApiError {
    Validation(Vec<FieldError>),
}

#[derive(Debug)]
struct FieldError {
    field: String,
    message: String,
}

fn to_api_errors(errs: ParseErrors) -> ApiError {
    ApiError::Validation(
        errs.issues
            .into_iter()
            .map(|e| FieldError { field: e.path, message: e.message })
            .collect(),
    )
}

Parse Errors in Effects

Schema decoding returns Result. Lift it into an Effect by mapping the error into your effect error channel.

effect! {
    let req = create_user_schema()
        .decode_unknown(&raw)
        .map_err(|err| ApiError::Validation(ParseErrors::one(err)))?;

    bind* create_user(req)
}

Display

ParseErrors implements Display by printing each issue on its own line. Empty paths omit the path: prefix.

name: must not be empty
age: age must be positive

Summary

TypeMeaning
ParseErrorOne failure with path and message
ParseErrorsAggregate { issues: Vec<ParseError> }
ParseErrors::one(err)Build a single-issue aggregate
ParseErrors::new(vec)Build an aggregate from collected issues

Testing — Effects Are Easy to Test

Testing async code often means standing up infrastructure, dealing with timing, and wide mocks—then chasing occasional flakes in CI.

Effect programs can be tested differently. Because an Effect is a description of what to do — not the doing itself — you control everything about how it runs. Swap in a test clock. Provide fake services through the Layer system. Detect fiber leaks automatically. Run in microseconds instead of seconds.

This chapter covers the testing tools effectful provides.

What Makes Effects Testable

Three properties make effect programs easy to test:

1. Services are injected, not ambient. Your code doesn’t call DatabaseClient::global(). It declares R: NeedsDb and gets its database from the environment. In tests, you provide a different environment — one with a fake database.

2. Time is injectable. Code that uses Clock instead of std::time::SystemTime::now() can be tested with TestClock, which advances only when you tell it to.

3. Effects don’t run until the harness runs them. An Effect is inert. You can inspect, compose, and modify it before running. In tests, prefer returning Effect from the test body and letting #[effect_test] run it at the edge.

What This Chapter Covers

  • #[effect_test] — effect-returning tests where the harness owns execution (next section)
  • run_test / run_effect_test* — lower-level harness helpers for explicit assertions (next section)
  • TestClock — deterministic time control in tests (ch15-02)
  • Mocking services — injecting test doubles via layers (ch15-03)
  • Property testing — generating inputs and checking invariants (ch15-04)

effect_test — The Test Harness Boundary

Effectful tests should compose effects in the test body and let the harness execute them. This keeps lazy execution semantics intact and keeps direct Effect::run(&mut ...) calls inside runtime/test harness internals.

Preferred Usage

use effectful::{Effect, effect_test};

#[effect_test]
fn simple_effect_succeeds() -> Effect<(), &'static str, ()> {
    Effect::new(|_| Ok(()))
}

#[effect_test] creates an async Tokio test and runs the returned effect. Ok(_) passes. Err(E) panics with Debug output, so the error type must implement Debug.

The macro uses effectful’s internal current-thread Tokio test re-export; downstream crates do not need to call Effect::run(&mut ...) in the test body.

Provided ServiceContext

use effectful::{Effect, MissingService, Service, ServiceContext, effect_test};

#[derive(Clone, Service)]
struct Config {
    port: u16,
}

fn test_env() -> ServiceContext {
    Config { port: 8080 }.to_context()
}

#[effect_test(env = "test_env")]
fn reads_config() -> Effect<(), MissingService, ServiceContext> {
    Effect::<Config, MissingService, ServiceContext>::service::<Config>()
        .map(|config| assert_eq!(config.port, 8080))
}

The fixture runs once per test and returns the environment consumed by the effect.

Layer-Based Setup

use effectful::{Effect, Layer, MissingService, Service, ServiceContext, effect_test};

#[derive(Clone, Service)]
struct Config {
    port: u16,
}

fn test_layer() -> Layer<Config, MissingService, ()> {
    Layer::succeed(Config { port: 8080 })
}

#[effect_test(layer = "test_layer")]
fn reads_layer_config() -> Effect<(), MissingService, ServiceContext> {
    Effect::<Config, MissingService, ServiceContext>::service::<Config>()
        .map(|config| assert_eq!(config.port, 8080))
}

This is the Rust equivalent of Effect’s it.layer(...) boundary: the test body remains an effect, and the adapter builds/provides the services.

Helper API

Use helper functions when an attribute macro is not appropriate.

use effectful::testing::expect_effect_test;

#[tokio::test]
async fn create_user_inserts_into_db() {
    expect_effect_test(create_user(NewUser { name: "Alice".into(), age: 30 })).await;
}

Available helpers:

FunctionNotes
expect_effect_test(effect).awaitRun with R: Default, panic on Err(E: Debug)
expect_effect_test_with_env(effect, env).awaitRun with explicit environment, panic on failure
expect_effect_test_with_layer(effect, layer).awaitBuild a layer for ServiceContext, panic on failure
run_effect_test(effect).awaitRun with R: Default, return Result<A, E>
run_effect_test_with_env(effect, env).awaitRun with explicit environment, return Result<A, E>
TestRuntime::with_env(fixture)Reusable adapter for explicit fixture functions

Asserting on Exit

run_test remains available when you need the older synchronous Exit<A, E> shape.

use effectful::{Exit, run_test};

#[test]
fn division_by_zero_fails() {
    let exit = run_test(divide(10, 0), ());
    assert!(matches!(exit, Exit::Failure(Cause::Fail(DivError::DivisionByZero))));
}

Pass () for effects with no environment. run_test(effect, env) resets the test leak counters, runs the effect with run_blocking, then checks the leak counters.

Common Exit shapes:

ExitMeaning
Exit::Success(a)Effect succeeded
Exit::Failure(Cause::Fail(e))Typed failure
Exit::Failure(Cause::Die(message))Defect message
Exit::Failure(Cause::Interrupt(id))Fiber interrupt

run_test_with_clock

use std::time::Instant;
use effectful::{TestClock, run_test_with_clock};

let clock = TestClock::new(Instant::now());
let exit = run_test_with_clock(effect, env, clock);

run_test_with_clock currently delegates to run_test after accepting the explicit clock argument. Use explicit-clock scheduling helpers (retry_with_clock, repeat_with_clock) when the effect itself must use that clock.

Leak Assertions

The testing module exposes assertion effects and test hooks:

use effectful::{assert_no_leaked_fibers, assert_no_unclosed_scopes};

run_blocking(assert_no_leaked_fibers(), ())?;
run_blocking(assert_no_unclosed_scopes(), ())?;

The effect-test helpers and run_test call both assertions after the effect run. If a hook recorded a leak, the assertion panics.

TestClock — Deterministic Time in Tests

TestClock is a manual Clock implementation. It is useful with retry_with_clock, repeat_with_clock, or code that accepts a Clock explicitly.

The Problem with Real Time in Tests

let eff = retry(
    || failing_call(),
    Schedule::exponential(Duration::from_secs(1)).compose(Schedule::recurs(3)),
);

With the default live clock, this waits in real time. Use TestClock to avoid real sleeps.

TestClock API

use std::time::{Duration, Instant};
use effectful::{Clock, TestClock};

let start = Instant::now();
let clock = TestClock::new(start);

let now: Instant = clock.now();
clock.advance(Duration::from_millis(500));
clock.set_time(start + Duration::from_secs(60));

let pending: Vec<Instant> = clock.pending_sleeps();

pending_sleeps() returns registered sleep deadlines. This is useful for asserting that code scheduled a delay.

run_test_with_clock

run_test_with_clock(effect, env, clock) runs an already-built effect and returns Exit<A, E>. It does not create a closure-based test harness.

use std::time::Instant;
use effectful::{TestClock, run_test_with_clock, succeed};

let clock = TestClock::new(Instant::now());
let exit = run_test_with_clock(succeed::<_, (), ()>(42), (), clock);
assert_eq!(exit, Exit::succeed(42));

Testing Scheduled Work

use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::time::{Duration, Instant};
use effectful::{Schedule, TestClock, repeat_with_clock, run_blocking, succeed};

let clock = TestClock::new(Instant::now());
let counter = Arc::new(AtomicU32::new(0));

let effect = repeat_with_clock(
    {
        let counter = counter.clone();
        move || {
            counter.fetch_add(1, Ordering::Relaxed);
            succeed::<u32, (), ()>(counter.load(Ordering::Relaxed))
        }
    },
    Schedule::spaced(Duration::from_secs(60)).compose(Schedule::recurs(3)),
    clock.clone(),
    None,
);

let value = run_blocking(effect, ())?;
assert_eq!(value, 4); // initial run + 3 repeats

TestClock::sleep records sleeps instead of blocking, so scheduled tests complete quickly.

Fake Services

When business logic needs time, pass a Clock as part of your service/environment design. In production provide LiveClock; in tests provide TestClock.

Mocking Services — Test Doubles via Layers

In effectful, a test double is just a different service value or layer. Production code gets a real service; tests get an in-memory or spy service with the same service type.

The Pattern

Wrap your interface in a cloneable service struct.

trait DbImpl: Send + Sync {
    fn get_user(&self, id: UserId) -> Effect<User, DbError, ()>;
    fn save_user(&self, user: User) -> Effect<(), DbError, ()>;
}

#[derive(Clone, Service)]
struct Db {
    inner: Arc<dyn DbImpl>,
}

impl Db {
    fn get_user(&self, id: UserId) -> Effect<User, DbError, ()> {
        self.inner.get_user(id)
    }

    fn save_user(&self, user: User) -> Effect<(), DbError, ()> {
        self.inner.save_user(user)
    }
}

Test Double

struct InMemoryDb {
    users: Mutex<HashMap<UserId, User>>,
}

impl DbImpl for InMemoryDb {
    fn get_user(&self, id: UserId) -> Effect<User, DbError, ()> {
        match self.users.lock().expect("users lock").get(&id).cloned() {
            Some(user) => succeed(user),
            None => fail(DbError::NotFound(id)),
        }
    }

    fn save_user(&self, user: User) -> Effect<(), DbError, ()> {
        self.users.lock().expect("users lock").insert(user.id, user);
        succeed(())
    }
}

Injecting the Test Double

#[effect_test(env = "test_env")]
fn get_user_returns_saved_user() -> Effect<(), DbError, ServiceContext> {
    let effect = Db::use_(|db| {
        effect! {
            bind* db.save_user(User { id: UserId::new(1), name: "Alice".into() });
            let user = bind* db.get_user(UserId::new(1));
            assert_eq!(user.name, "Alice");
        }
    });

    effect
}

fn test_env() -> ServiceContext {
    let db = Db { inner: Arc::new(InMemoryDb::new()) };
    db.to_context()
}

Business logic is unchanged. Only the service value changes.

Spies

When you need to assert calls, add tracking to the test double.

#[derive(Clone, Service)]
struct Mailer {
    sent: Arc<Mutex<Vec<Email>>>,
}

impl Mailer {
    fn send(&self, email: Email) -> Effect<(), MailError, ()> {
        self.sent.lock().expect("sent lock").push(email);
        succeed(())
    }
}

#[test]
fn registration_sends_welcome_email() {
    let mailer = Mailer { sent: Arc::new(Mutex::new(Vec::new())) };
    let env = mailer.clone().to_context();

    let exit = run_test(register_user("alice@example.com"), env);
    assert!(matches!(exit, Exit::Success(_)));

    let sent = mailer.sent.lock().expect("sent lock");
    assert_eq!(sent.len(), 1);
}

Failing Services

Test failure handling by providing a service whose methods fail.

struct FailingDb;

impl DbImpl for FailingDb {
    fn get_user(&self, _id: UserId) -> Effect<User, DbError, ()> {
        fail(DbError::ConnectionLost)
    }

    fn save_user(&self, _user: User) -> Effect<(), DbError, ()> {
        fail(DbError::ConnectionLost)
    }
}

#[test]
fn get_user_propagates_db_errors() {
    let env = Db { inner: Arc::new(FailingDb) }.to_context();
    let exit = run_test(get_user(UserId::new(1)), env);
    assert!(matches!(exit, Exit::Failure(Cause::Fail(DbError::ConnectionLost))));
}

Layer-Based Setup

For larger tests, package doubles in layers.

fn test_layer() -> Layer<(Db, Mailer), AppError, ()> {
    Layer::succeed(Db { inner: Arc::new(InMemoryDb::new()) })
        .merge(Layer::succeed(Mailer::spy()))
}

#[effect_test(layer = "test_layer")]
fn full_registration_flow_works() -> Effect<(), AppError, ServiceContext> {
    full_registration_flow().void()
}

What You Don’t Need

  • No mock framework.
  • No #[cfg(test)] in business logic.
  • No global service registry reset between tests.
  • No special mocking API beyond ordinary services and layers.

Property Testing — Invariants over Inputs

Property tests check invariants over many generated inputs. Effect programs are good targets because the runner boundary is explicit and test environments are ordinary values.

Setup

[dev-dependencies]
proptest = "1"

Testing Pure Effects

use effectful::{Exit, run_test};
use proptest::prelude::*;

proptest! {
    #[test]
    fn addition_is_commutative(a: i64, b: i64) {
        let r_ab = run_test(add(a, b), ());
        let r_ba = run_test(add(b, a), ());

        prop_assert_eq!(r_ab, r_ba);
    }
}

run_test(effect, env) returns Exit<A, E>. For properties, either compare exits directly or match Exit::Success(value).

Testing Schema Round-Trips

Schemas expose encode, decode, and decode_unknown.

use effectful::schema::{i64, string, struct_, transform};
use proptest::prelude::*;

proptest! {
    #[test]
    fn user_schema_round_trips(name in "[a-zA-Z]{1,50}", age in 0i64..=120) {
        let schema = transform(
            struct_("name", string::<()>(), "age", i64::<()>()),
            |(name, age)| Ok(User { name, age }),
            |user: User| (user.name, user.age),
        );

        let original = User { name, age };
        let wire = schema.encode(original.clone());
        let parsed = schema.decode(wire);

        prop_assert_eq!(parsed.ok(), Some(original));
    }
}

Round-trip tests catch asymmetries between encode and decode logic.

Testing Error Invariants

Use TRef::make and commit to construct transactional state.

use effectful::{Cause, Exit, TRef, commit, run_blocking, run_test};
use proptest::prelude::*;

proptest! {
    #[test]
    fn withdraw_never_goes_negative(balance in 0u64..=1_000_000, amount in 0u64..=1_000_000) {
        let account = run_blocking(commit(TRef::make(balance)), ()).expect("make account");
        let exit = run_test(withdraw(account.clone(), amount), ());
        let new_balance = run_blocking(commit(account.read_stm::<InsufficientFunds>()), ())
            .expect("read balance");

        if amount <= balance {
            prop_assert!(matches!(exit, Exit::Success(_)));
            prop_assert_eq!(new_balance, balance - amount);
        } else {
            prop_assert!(matches!(exit, Exit::Failure(Cause::Fail(InsufficientFunds))));
            prop_assert_eq!(new_balance, balance);
        }
    }
}

Generating Service Environments

For integration-style properties, generate random state and place it in a test service.

proptest! {
    #[test]
    fn get_user_returns_what_was_saved(user in arbitrary_user()) {
        let db = Db::in_memory();
        let env = db.clone().to_context();

        let save_exit = run_test(save_user(user.clone()), env.clone());
        prop_assert!(matches!(save_exit, Exit::Success(_)));

        let get_exit = run_test(get_user(user.id), env);
        prop_assert!(matches!(get_exit, Exit::Success(retrieved) if retrieved == user));
    }
}

Define generators with normal proptest strategies.

fn arbitrary_user() -> impl Strategy<Value = User> {
    (
        any::<u64>().prop_map(UserId::new),
        "[a-zA-Z ]{1,50}",
        0i64..=120,
    ).prop_map(|(id, name, age)| User { id, name, age })
}

Schema-Driven Generation

There is no built-in generate_valid::<T>() helper. Keep generators beside schemas and use round-trip properties to ensure they stay aligned.

proptest! {
    #[test]
    fn generated_users_are_accepted(user in arbitrary_user()) {
        let schema = user_schema();
        let wire = schema.encode(user.clone());
        prop_assert_eq!(schema.decode(wire).ok(), Some(user));
    }
}

Shrinking

proptest automatically shrinks failing inputs to the smallest example that still fails. Because effects are run explicitly, shrinking remains easy to reason about.

Test failed. Minimal failing input:
  name = ""
  age = -1
Reason: must not be empty (path: name)

Observability

Effectful includes lazy span instrumentation for production tracing without forcing an OpenTelemetry SDK or exporter dependency into core.

Use spans when you want to know when an Effect starts, succeeds, fails, how long it ran, what trace/span identity it had, and how it relates to parent spans.

Manual Spans

Wrap any existing effect with with_span or the method form.

#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking, succeed};

let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());

let effect = succeed::<u32, (), ()>(42).with_span("manual.demo");
assert_eq!(run_blocking(effect, ()), Ok(42));
}

Use SpanOptions for levels and typed startup attributes.

#![allow(unused)]
fn main() {
use effectful::{SpanLevel, SpanOptions, succeed};

let effect = succeed::<(), (), ()>(()).with_span_options(
  SpanOptions::new("manual.options")
    .with_level(SpanLevel::Debug)
    .with_attribute("cached", true)
    .with_attribute("attempt", 2_i32),
);
}

Function Spans

Use #[effectful::span] on functions returning Effect.

#![allow(unused)]
fn main() {
use effectful::{Effect, Never, span, succeed};

#[span(name = "user.load", level = debug)]
fn load_user(id: u32) -> Effect<u32, Never, ()> {
  succeed::<u32, Never, ()>(id + 1)
}
}

Calling load_user only constructs an effect. The span starts when that returned effect is executed.

By default, span names are module_path::function_name, and function arguments are captured with Debug formatting as string attributes.

Privacy Controls

Skip sensitive, large, or non-Debug arguments with skip(...).

#![allow(unused)]
fn main() {
use effectful::{Effect, Never, span, succeed};

struct Secret(String);

#[span(name = "auth.login", skip(password))]
fn login(user_id: u64, password: Secret) -> Effect<(), Never, ()> {
  let _ = password.0.len();
  succeed::<(), Never, ()>(())
}
}

Use skip_all to capture only explicit fields.

#![allow(unused)]
fn main() {
#[effectful::span(skip_all, fields(route = "/users/:id", cache_hit = true))]
fn route() -> effectful::Effect<(), effectful::Never, ()> {
  effectful::succeed::<(), effectful::Never, ()>(())
}
}

Field values can be typed, display-formatted with %, or debug-formatted with ?.

#![allow(unused)]
fn main() {
#[effectful::span(fields(count = 3_i32, user = %user_id, payload = ?payload))]
fn work(user_id: u64, payload: Vec<u8>) -> effectful::Effect<(), effectful::Never, ()> {
  effectful::succeed::<(), effectful::Never, ()>(())
}
}

Typed fields preserve strings, booleans, signed integers, and floats. % and ? fields are stored as strings.

Disabled Tracing

When tracing is disabled or not installed, span hooks are no-ops. For non-async #[span] functions, the span macro also avoids formatting captured arguments while tracing is disabled or the span is sampled out.

#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking};

let _ = run_blocking(install_tracing_layer(TracingConfig::default()), ());
}

Trace Context

Every span record has local OpenTelemetry-shaped identity: TraceId, SpanId, TraceFlags, and parent span id.

Use W3C traceparent helpers at integration boundaries.

#![allow(unused)]
fn main() {
use effectful::SpanContext;

let context = SpanContext::from_traceparent(
  "00-01010101010101010101010101010101-0202020202020202-01",
)?;

let header = context.to_traceparent();
Ok::<(), effectful::TraceParentParseError>(())
}

Rust tracing Bridge

Enable the Rust tracing bridge when you want existing subscribers, Loki pipelines, or tracing-opentelemetry stacks to consume effectful spans. Use TracingConfig::tracing_bridge_only() for production forwarding without retaining snapshot_tracing() buffers; use enabled_with_tracing_bridge() when you also need in-memory snapshots.

#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking};

let _ = tracing_subscriber::fmt().try_init();
let _ = run_blocking(
  install_tracing_layer(TracingConfig::tracing_bridge_only()),
  (),
);
}

Full snapshot+bridge mode emits spans named effectful.span with otel_name, otel_trace_id, otel_span_id, otel_parent_span_id, otel_trace_flags, status, duration, and attribute fields. Span events emitted with emit_current_span_event are emitted as Rust tracing events inside the current span.

Bridge-only mode emits name-only Rust tracing spans with otel_name. It intentionally skips trace id generation, parent lookup, attribute recording, and snapshot storage so it can stay production-oriented.

For high-throughput services, use an async/batched subscriber/exporter. Subscriber formatting, locks, and exporter backpressure usually dominate span overhead.

Sampling

Use sampling on hot paths. default_sample_rate applies globally; SpanOptions::sample_rate and #[effectful::span(sample = ...)] override it per span. Rates are rounded down to a power-of-two cadence, so 0.3 records at most one in four spans.

#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking};

let mut config = TracingConfig::tracing_bridge_only();
config.default_sample_rate = 0.0625;
let _ = run_blocking(install_tracing_layer(config), ());
}

Snapshot mode is best for tests, local debugging, and diagnostics. Bridge-only plus sampling is the production-oriented path for very small effects.

See examples 106_span_macro.rs and 107_tracing_bridge.rs for executable coverage.

API Quick Reference

A condensed reference for commonly used items in effectful 0.2.2. For complete signatures, run cargo doc --open -p effectful.

Core Types

TypeDescription
Effect<A, E, R>Lazy computation that succeeds with A, fails with E, and requires environment R
Exit<A, E>Terminal outcome: Success(A) or Failure(Cause<E>)
Cause<E>Failure algebra: Fail(E), Die(String), Interrupt(FiberId), Both, Then
Context<Cons<..., Nil>>Typed heterogeneous environment for tagged services
ServiceContextRuntime service table used by the derive-based service/layer API
Layer<ROut, E, RIn>Lazy service layer for ServiceContext
Stm<A, E>Transactional computation
Stream<A, E, R>Pull stream of A chunks
Sink<Out, In, E, R>Consumer of stream elements
Schema<A, I, E>Bidirectional decoder/encoder with Unknown support

Effect Constructors

FunctionNotes
succeed(a)Always succeeds with a
fail(e)Always fails with e
pure(a)Infallible Effect<A, (), ()> convenience
from_async(f)Lift an async closure returning Result<A, E>
Effect::new(f)Lift a synchronous closure over &mut R
Effect::new_async(f)Lift an async closure that may borrow &mut R
effect! { ... }Do-notation; use bind* effect to bind

Effect Combinators

MethodNotes
.map(f)Transform success value
.flat_map(f)Sequentially compose effects
.and_then(other)Sequence and keep right result
.and_then_discard(other)Sequence and keep left result
.map_error(f)Transform typed error
.catch(f)Recover by returning another effect
.catch_all(f)Recover with a fallback value and Never error
.tap_error(f)Run an effect on failure and re-emit the error
.provide_env(env)Capture a full environment and return Effect<_, _, ()>
.zoom_env(f)Run an effect requiring R inside a larger environment S
.local(f)Run with a cloned and modified local environment
.ensuring(finalizer)Run finalizer after success or failure
.on_exit(f)Observe Exit without changing result

Running Effects

FunctionNotes
run_blocking(effect, env)Blocking synchronous runner
run_async(effect, env).awaitAsync runner
#[effect_test]Attribute for tests that return Effect; harness executes and panics on Err(E: Debug)
expect_effect_test(effect).awaitAsync test helper for R: Default, panics on failure
expect_effect_test_with_env(effect, env).awaitAsync test helper with explicit environment
expect_effect_test_with_layer(effect, layer).awaitAsync test helper for ServiceContext plus Layer
run_effect_test(effect).awaitAsync test helper returning Result<A, E>
run_effect_test_with_env(effect, env).awaitAsync test helper returning Result<A, E> with explicit environment
TestRuntime::with_env(fixture)Reusable async test adapter with environment fixture
run_test(effect, env)Test runner returning Exit<A, E>
run_test_with_clock(effect, env, clock)Test runner with explicit TestClock argument

Services and Layers

ItemNotes
#[derive(Service)]Make a cloneable struct available through Effect::service::<S>()
Effect::service::<S>()Read service S from R: ServiceLookup<S>
ServiceContext::empty().add(service)Build a derive-service environment
Layer::succeed(service)Infallible service layer
Layer::effect(name, make)Effectful service layer
layer.merge(other)Build independent services into one context
layer.provide(provider)Use provider services to build the layer, then hide provider output
layer.provide_merge(provider)Provide dependencies and keep both outputs
layer.memoized()Cache the first layer build result

Tagged Context Helpers

ItemNotes
service_key!(pub struct Key);Declare a nominal key type
Tagged<Key, V> / Service<Key, V>Keyed service cell
service_env::<Key, _>(value)Build a one-cell Context
ctx.get::<Key>()Read a tagged value from a typed context
effect.provide_head(value)Provide the head tagged dependency

Concurrency

Function/MethodNotes
`run_fork(&runtime,
handle.join()Await result as Result<A, Cause<E>>
handle.await_exit()Await result as Exit<A, E>
handle.interrupt()Interrupt the fiber
handle.status()Inspect Running, Succeeded, Failed, or Interrupted
fiber_all(handles)Join many FiberHandles into Effect<Vec<A>, Cause<E>, ()>
with_fiber_id(id, f)Run f under a fiber id

STM

FunctionNotes
TRef::make(v)Transactionally allocate a cell: Stm<TRef<T>, ()>
tref.read_stm()Read inside Stm
tref.write_stm(v)Write inside Stm
tref.update_stm(f)Update value, returning ()
tref.modify_stm(f)Compute (output, next) and write next
Stm::succeed(a) / Stm::fail(e)Transactional success/failure
Stm::retry()Retry the current transaction
commit(stm) / atomically(stm)Lift Stm<A, E> into Effect<A, E, R>
TQueue::bounded(n) / TQueue::unbounded()Transactional FIFO queues
queue.offer(v) / queue.take()Enqueue or dequeue transactionally
TMap::make() / map.get, set, deleteTransactional map
TSemaphore::make(n) / acquire, releaseTransactional permit counter

Scheduling

FunctionNotes
Schedule::recurs(n)Allow n additional iterations
Schedule::spaced(d)Fixed delay
Schedule::exponential(base)Exponential backoff
schedule.compose(other)Continue while both schedules continue; use max delay
schedule.jittered()Apply deterministic jitter
Schedule::recurs_while(pred)Continue while predicate holds
Schedule::recurs_until(pred)Continue until predicate holds
`retry(
`repeat(

Streams and Sinks

FunctionNotes
Stream::from_iterable(iter)Stream from iterable values
Stream::from_effect(effect)Stream from Effect<Vec<A>, E, R>
stream.map, filter, take, take_while, drop_whileStream transformations
stream.run_collect()Collect all elements into Vec<A>
stream.run_fold(init, f)Fold elements
stream.run_for_each(f)Run synchronous consumer
stream.run_for_each_effect(f)Run effectful consumer
Sink::collect()Sink collecting elements into Vec
Sink::fold_left(init, f)Folding sink
Sink::drain()Discarding sink
sink.run(stream)Run a sink against a stream

Schema

FunctionNotes
Unknown::{Null, Bool, I64, F64, String, Array, Object}Dynamic input values
string(), i64(), f64(), bool_()Primitive schemas
optional(s), array(s)Container schemas
tuple, tuple3, tuple4Tuple schemas
struct_, struct3, struct4Struct schemas from field schemas
union_, union_chainUnion schemas
filter(s, pred, msg) / refine(...)Validation refinements
transform(s, decode, encode)Bidirectional transformation
schema.decode(input)Decode typed wire input
schema.decode_unknown(&unknown)Decode Unknown input
ParseError::new(path, message)Single parse issue
ParseErrors::one(err) / ParseErrors::new(vec)Aggregate parse issues

Migrating from async fn to effects

This appendix shows the current migration shape for effectful: return Effect, keep dependencies in R, and run at the boundary with an explicit environment.

Mental Model Shift

In ordinary async Rust, calling an async fn creates a Future; awaiting it runs the work.

async fn get_user(id: u64, db: &DbClient) -> Result<User, DbError> {
    db.query_one(id).await
}

In effectful, a function returns an Effect description. The runner receives the environment later.

fn get_user(id: u64) -> Effect<User, DbError, DbClient> {
    effect!(|db: &mut DbClient| {
        bind* db.query_one(id)
    })
}

let user = run_blocking(get_user(42), db_client)?;

Pattern 1: async fn to Effect

Before

pub async fn process_order(
    order_id: OrderId,
    db: &DbClient,
    mailer: &MailClient,
) -> Result<Receipt, AppError> {
    let order = db.get_order(order_id).await?;
    let receipt = db.complete_order(order).await?;
    mailer.send_receipt(&receipt).await?;
    Ok(receipt)
}

After

#[derive(Clone)]
struct AppEnv {
    db: DbClient,
    mailer: MailClient,
}

pub fn process_order(order_id: OrderId) -> Effect<Receipt, AppError, AppEnv> {
    effect!(|env: &mut AppEnv| {
        let order = bind* env.db.get_order(order_id).map_error(AppError::Db);
        let receipt = bind* env.db.complete_order(order).map_error(AppError::Db);
        bind* env.mailer.send_receipt(&receipt).map_error(AppError::Mail);
        receipt
    })
}

Migration steps:

  1. Change async fn to fn returning Effect<A, E, R>.
  2. Move dependencies into an environment type or service context.
  3. Replace .await? on effectful operations with bind*.
  4. Return the success value as the block tail.
  5. Call run_blocking(effect, env) or run_async(effect, env) at the boundary.

Pattern 2: Wrapping Third-Party Async

Third-party libraries return futures, not effects. Wrap them with from_async.

fn fetch_price(symbol: String) -> Effect<f64, reqwest::Error, ()> {
    from_async(move |_r: &mut ()| async move {
        let response = reqwest::get(format!("https://api.example.com/price/{symbol}"))
            .await?;
        let body = response.json::<PriceResponse>().await?;
        Ok(body.price)
    })
}

Inside the async closure, use normal .await. Outside, compose the result as an Effect.

Pattern 3: Error Types

Map infrastructure errors into your application error at composition points.

#[derive(Debug)]
enum AppError {
    Db(DbError),
    Mail(MailError),
}

let effect = db_call().map_error(AppError::Db);

Use catch to recover with another effect, and catch_all to turn typed errors into fallback success values.

Pattern 4: Services

For application dependency injection, prefer #[derive(Service)] plus ServiceContext.

#[derive(Clone, Service)]
struct AppState {
    request_count: Arc<AtomicU64>,
}

fn handler() -> Effect<Response, AppError, ServiceContext> {
    AppState::use_sync(|state| {
        state.request_count.fetch_add(1, Ordering::Relaxed);
        Response::ok()
    })
}

let env = AppState::new().to_context();
let response = run_blocking(handler(), env)?;

For tagged HList contexts, use service_key!(pub struct Key);, service_env::<Key, _>(value), and Context::get::<Key>().

Pattern 5: Transactional State

Use TRef when state updates must compose transactionally.

let counter = run_blocking(commit(TRef::make(0_u64)), ())?;

fn increment(counter: TRef<u64>) -> Effect<u64, (), ()> {
    effect! {
        bind* commit(counter.update_stm(|n| n + 1));
        bind* commit(counter.read_stm())
    }
}

There is no stm! macro in the current API; compose transactions with flat_map, map, and helpers like update_stm.

Pattern 6: Resource Cleanup

Use Scope when cleanup must run at an explicit lifetime boundary.

fn with_connection<A, E, F>(pool: Pool<Connection, DbError>, f: F) -> Effect<A, E, ()>
where
    F: FnOnce(Connection) -> Effect<A, E, ()> + 'static,
    A: 'static,
    E: From<DbError> + 'static,
{
    scope_with(move |scope| {
        effect! {
            let conn = bind* pool.get().provide_env(scope.clone()).map_error(E::from);
            let close_conn = conn.clone();
            scope.add_finalizer(Box::new(move |_| close_conn.close()));
            bind* f(conn)
        }
    })
}

For pooled resources, Pool::get() registers return-to-pool cleanup on the provided Scope.

Migration Strategy

  1. Convert leaf async wrappers first with from_async.
  2. Introduce explicit environment structs or ServiceContext.
  3. Move run_blocking / run_async to program edges.
  4. Convert tests to pass test environments or test layers.
  5. Replace stale helper assumptions with current names: run_collect, run_fold, retry(|| ..., schedule), TRef::make, run_test(effect, env).

You can mix old async code with effects during migration. Wrap async futures at the edge and keep new domain workflows as Effect values.

Glossary

Key terms used throughout this book.


bind* (bind operator) The prefix operator inside effect! that runs an inner effect and yields its success value. let x = bind* eff binds the value; bind* eff; runs and discards it.


Backpressure The mechanism by which a slow consumer controls a fast producer. In effectful streams this is represented by BackpressurePolicy: BoundedBlock, DropNewest, DropOldest, or Fail.


Brand A zero-cost nominal wrapper. Brand<String, EmailMarker> and Brand<String, NameMarker> are different types even if both wrap String.


Cause<E> The structured reason an effect failed: Fail(E), Die(String), Interrupt(FiberId), or composed causes Both / Then.


Chunk<A> A batch of stream elements. Streams pull and process chunks rather than one element at a time internally.


Clock A trait abstracting time. LiveClock uses real time; TestClock is controlled by tests.


commit / atomically Functions that lift Stm<A, E> into Effect<A, E, R>. Running the effect executes the transaction and retries on conflicts or Stm::retry().


Context<...> Typed heterogeneous environment for tagged services, built from Cons / Nil cells containing Tagged<K, V> values.


Effect<A, E, R> The central type: a lazy description of work that succeeds with A, fails with E, and requires environment R.


effect! macro Do-notation for Effect. It rewrites bind* effect into bind/await plumbing and wraps the block tail as success.


Exit<A, E> Terminal outcome: Exit::Success(A) or Exit::Failure(Cause<E>). Returned by run_test and FiberHandle::await_exit().


Fiber A lightweight unit of concurrent work. Spawn with run_fork; await with join() or await_exit().


FiberRef A fiber-scoped variable used for request ids, tracing context, and similar dynamic data.


from_async Constructor that lifts an async closure returning Result<A, E> into an Effect<A, E, R>.


HList The compile-time linked-list shape Cons<Head, Tail> / Nil used by the typed Context API.


Layer A lazy recipe for building services into a ServiceContext. Compose layers with merge, provide, provide_merge, and memoized().


Never The uninhabited runtime error type (Infallible) used when an effect cannot fail through its typed error channel.


ParseError / ParseErrors Schema parse failures. Build a single issue with ParseError::new(path, message) and aggregate with ParseErrors::one or ParseErrors::new.


R (environment type parameter) The third parameter of Effect<A, E, R>. It encodes required dependencies in the type.


run_blocking Synchronous runner: run_blocking(effect, env). Use at application/test boundaries, not inside reusable library functions.


effect_test Attribute for effect-returning tests. The test body returns Effect; the harness executes it, provides the environment or layer, checks leak counters, and panics with Debug output on typed failure.

run_test Lower-level synchronous test runner: run_test(effect, env) -> Exit<A, E>. It resets and checks the test leak counters around the run.


Schedule A value describing delays and repeat limits for the free retry(|| effect, schedule) and repeat(|| effect, schedule) functions. Current constructors include recurs, spaced, exponential, recurs_while, and recurs_until.


Schema A Schema<A, I, E> decodes wire value I or dynamic Unknown into A and can encode A back to I.


Scope A resource lifetime boundary. Register finalizers with Scope::add_finalizer; close the scope to run finalizers.


service_key! Macro for declaring a nominal key type, e.g. service_key!(pub struct DbKey);. Pair it with Tagged<DbKey, Db> / Service<DbKey, Db>.


Sink A stream consumer represented by Sink<Out, In, E, R>. Built-ins include Sink::collect, Sink::fold_left, and Sink::drain; run with sink.run(stream).


Stm<A, E> A composable transactional computation. Use Stm::succeed, Stm::fail, Stm::retry, flat_map, and map; run with commit / atomically.


Stream A pull stream of values with typed error and environment. Build with from_iterable or from_effect; consume with run_collect, run_fold, or sinks.


Tag / Tagged The typed key/value mechanism for contexts. Tagged<K, V> stores a value V under nominal key K.


TestClock Controllable clock for deterministic scheduling tests. Construct with TestClock::new(start_instant).


TRef<T> A transactional mutable cell. Allocate with TRef::make(value) inside Stm; use read_stm, write_stm, update_stm, and modify_stm.


Unknown Dynamic input for schemas. Current variants are Null, Bool, I64, F64, String, Array, and Object.