Introduction
Welcome to Typed Effects in Rust.
If you already use async Rust, you know the model: Futures are polled by an executor; work runs when those futures are driven (for example with .await). That foundation is sound, and this book does not ask you to unlearn it.
What teams often hit next is organization at scale: error types that grow without structure, dependencies threaded through long call chains, and background work whose lifetime is hard to reason about. Those problems are not unique to Rust, but they show up in every non-trivial async codebase.
effectful is a library for writing async programs where the shape of the work—success type, error type, and required environment—is carried in one place, and where much of the program is built as composable descriptions (Effect<A, E, R>) that you run only when you choose how and with which dependencies.
You still run on ordinary async runtimes. You still use .await inside bridges to third-party code. What changes is how you structure domain logic, tests, and dependency boundaries.
Who This Book Is For
You should know Rust basics: ownership, borrowing, traits, and how async/await and Future fit together. You do not need prior experience with category theory or functional programming jargon—we introduce terms only when they help.
If you want a typed, compositional style for async Rust—with explicit requirements in the type system and a clear split between “what to run” and “how to run it”—this book is for you.
How to Read This Book
Part I: Foundations explains why effects are useful and teaches the core types. Start here.
Part II: Environment & Dependencies covers the R parameter and compile-time dependency injection patterns.
Part III: Real Programs covers error handling, concurrency, resources, and scheduling for production code.
Part IV: Advanced covers STM, streams, schemas, and testing—read when you need those topics.
Code examples are intended to compile unless marked otherwise.
Let’s begin.
Why Effects?
Before we write effect code in detail, it helps to agree on what problem we are solving and where effectful sits relative to ordinary async Rust.
Rust’s async model is built on Future and executors: futures are lazy until polled, and .await is how async functions compose. That model is not a mistake—it is the standard way to express non-blocking I/O and concurrency.
At application scale, the difficulties are usually engineering ones:
- Errors — mapping and aggregating failures across layers without losing structure.
- Dependencies — passing clients, configuration, and context without turning every function signature into a long parameter list (or hiding the same behind globals).
- Concurrency — knowing who owns a task, how it shuts down, and what happens on cancellation.
This chapter names those patterns, relates them to how Effect<A, E, R> is designed, and sets up the rest of Part I.
By the end of the chapter you should understand:
- Why teams reach for a declarative layer on top of hand-written
async fnchains. - What “effect” means in this book: a description of work, separate from running it with a chosen environment.
- Why the type has three parameters (
A,E,R) and why that matters for APIs and tests.
We start with a concrete look at those recurring challenges.
Challenges in Large Async Codebases
Async Rust gives you non-blocking I/O and structured concurrency primitives. In production, the same strengths can become painful when composition and boundaries are not planned: errors, dependencies, and spawned work all tend to accumulate complexity.
This section is not a claim that “async is broken.” It is a concise picture of problems effectful is meant to help with—so the rest of the book has a shared vocabulary.
Challenge 1: Error mapping and noise
A typical async workflow chains several operations. Each step may fail in its own way, so you map errors into a domain type and propagate:
async fn process_order(order: Order) -> Result<Receipt, ProcessError> {
let config = get_config()
.await
.map_err(|e| ProcessError::Config(e))?;
let user = fetch_user(&config, order.user_id)
.await
.map_err(|e| ProcessError::User(e))?;
let inventory = check_inventory(&config, &order.items)
.await
.map_err(|e| ProcessError::Inventory(e))?;
let payment = charge_payment(&config, &user, order.total)
.await
.map_err(|e| ProcessError::Payment(e))?;
let shipment = create_shipment(&config, &order, &user)
.await
.map_err(|e| ProcessError::Shipment(e))?;
Ok(Receipt::new(order, payment, shipment))
}
The business steps are clear, but the .map_err noise is repetitive. The domain ProcessError enum often grows with every new integration. Policy (retries, fallbacks) may live in callers or ad hoc helpers, which makes behavior harder to see in one place.
What effects add: failure and recovery can be expressed as transformations on a description (for example retry, map_error, structured Exit types), so policies are easier to reuse and test without rewriting the core flow.
Challenge 2: Explicit dependency parameters
Another common shape is the handler that needs many clients and cross-cutting services:
async fn handle_request(
db: &DatabasePool,
cache: &RedisClient,
logger: &Logger,
config: &AppConfig,
metrics: &MetricsClient,
tracer: &Tracer,
request: Request,
) -> Response {
// ...
}
Dependencies are explicit, which is good for honesty, but every layer between here and main must repeat or forward them. Tests must build or mock the same bundle repeatedly. Alternatives (globals, implicit context) trade one problem for another.
What effects add: required capabilities can be expressed in R (the environment type) and satisfied in one place at the edge, while inner functions stay focused on logic.
Challenge 3: Background work and lifetimes
Fire-and-forget background tasks are easy to start and harder to reason about:
fn start_background_worker(db: DatabasePool) {
tokio::spawn(async move {
loop {
match process_queue(&db).await {
Ok(_) => {}
Err(e) => eprintln!("Worker error: {}", e),
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
}
Questions that matter in production—shutdown, cancellation, panic behavior, and resource cleanup—need explicit design. That is true in any async system; the goal is to make ownership and intent visible in the program structure.
What effects add: structured concurrency patterns (fibers, scopes, handles) integrate with the same Effect abstraction so “what runs” and “how it ends” can be expressed consistently.
How this relates to Future and async
Remember: async fn bodies compile to Futures; nothing runs until a future is polled (for example via .await on an async caller). The difficulties above are usually about how we organize async code—signatures, error types, and where side effects are allowed—not about rejecting the Future model.
In practice, hand-written async often reads like a straight-line script: await one step, then the next. That is appropriate for many functions. It becomes harder when you want the same logical workflow to be inspected, wrapped (retries, timeouts), or tested with a substituted environment without threading mocks through every layer.
Effects push the “script” into a value: Effect<A, E, R> is a description that you run with run_async, run_blocking, or test harnesses—after you have composed and configured it.
That does not replace understanding executors or Future. It adds a layer for domain structure: answer type A, error type E, requirements R, and explicit execution.
Next we define what an Effect is in this library and how that description differs from calling async fn directly—without exaggerating either side.
What Even Is an Effect?
An Effect is a description of a computation, not the computation itself.
The rest of the API—map, flat_map, environment types, runners—is there to work with that description in a type-safe way.
The Recipe Analogy
Think about a recipe for chocolate cake.
A recipe is not a cake. You can hold a recipe in your hands without any flour appearing. You can read a recipe without preheating an oven. You can photocopy a recipe, modify it (less sugar, more cocoa), combine it with a frosting recipe, and share it with a friend — all without a single cake coming into existence.
The cake only appears when someone executes the recipe. Takes out the ingredients, follows the steps, waits for the oven.
An Effect is a recipe for a computation.
When you write succeed(42), you’re not “succeeding” at anything. You’re writing down a recipe that says “when executed, produce the value 42.” The 42 doesn’t exist yet. No computation has happened. You just have a piece of paper with instructions on it.
use effectful::{Effect, succeed};
// This doesn't compute anything — it's a description
let recipe: Effect<i32, String, ()> = succeed(42);
// Still nothing has happened. `recipe` is just a value.
// We can pass it around, store it, inspect its type.
The computation only happens when you explicitly run it:
use effectful::run_blocking;
// NOW something happens
let result: Result<i32, String> = run_blocking(recipe, ());
assert_eq!(result, Ok(42));
Building Up Descriptions
Because an Effect is just data — a description — you can transform it without running it.
let recipe: Effect<i32, String, ()> = succeed(42);
// Transform the description: "when executed, produce 42, then double it"
let doubled: Effect<i32, String, ()> = recipe.map(|x| x * 2);
// Still nothing has happened! `doubled` is just a modified recipe.
// Now run it
let result = run_blocking(doubled, ());
assert_eq!(result, Ok(84));
The .map() call didn’t execute anything. It took one recipe and produced a new recipe that includes an extra step. Like writing “double the result” at the bottom of your cake recipe — the cake doesn’t change until someone bakes it.
A More Realistic Example
Let’s see what this looks like with actual I/O:
use effectful::{Effect, effect, run_blocking};
// This function doesn't fetch anything — it returns a DESCRIPTION
// of how to fetch a user
fn fetch_user(id: u64) -> Effect<User, DbError, ()> {
effect! {
let conn = bind* connect_to_db();
let user = bind* query_user(&conn, id);
user
}
}
// Calling the function doesn't open any connections
let description = fetch_user(42);
// `description` is a value we can hold, pass around, combine with others
// No database has been touched
// Only when we run it does the I/O happen
let user = run_blocking(description, ())?;
That effect! block looks imperative — it looks like it’s doing things. But it’s not. It’s building a description of things to do. The bind* operator means “this step depends on the previous step completing” — it’s describing sequencing, not executing it.
The Key Insight: Separation of Concerns
This separation — description vs execution — is how the challenges from the previous section (errors, dependencies, task structure) get a consistent home in the type system.
Error handling becomes part of the description itself. When you write:
let resilient = retry(
|| risky_operation(),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);
You’re not adding retry logic to running code. You’re modifying the description to say “when executed, retry up to 3 times with exponential backoff.” The retry logic is baked into the recipe.
Dependencies become part of the type signature. When you write:
fn get_user(id: u64) -> Effect<User, DbError, Database>
That Database in the type says “this recipe requires a Database to execute.” The compiler enforces it. You can’t run the effect without providing a Database. No runtime surprises.
Structured concurrency becomes possible because the runtime knows what each effect intends to do before it does it. Spawning an effect doesn’t fire and forget — it creates a handle to a structured task with clear ownership and cancellation semantics.
What’s in an Effect?
An Effect<A, E, R> carries three pieces of information in its type:
A— the Answer: what you get if it succeedsE— the Error: what you get if it failsR— the Requirements: what environment is needed to run it
We’ll explore all three in the next section. For now, just notice that an Effect’s type tells you everything about what it does — success, failure, and dependencies — without you having to read the implementation.
// This type signature tells the whole story:
fn process_payment(
amount: Money
) -> Effect<Receipt, PaymentError, (PaymentGateway, Logger)>
// - Produces a Receipt on success
// - Can fail with PaymentError
// - Requires a PaymentGateway and Logger to run
No need to read the function body to know what resources it needs or what errors it can produce. The type is the documentation.
Style: imperative async vs effect descriptions
Typical async fn code is written as a sequence of steps: each .await drives the next piece of work. That is clear and idiomatic Rust.
Effect code in this library is often written so that many domain functions return Effect<…>: a value that describes work and only runs when you pass it to a runner with an environment. The style emphasizes composition (map, flat_map, layers, retries) before execution.
Both approaches run on the same Future machinery underneath. Use effects where you want environment and error structure in the type, shared policies, and test substitution at the boundary; use plain async fn where a small linear function is enough.
Let’s look at those three type parameters in detail.
The Three Type Parameters
Every Effect carries three type parameters: Effect<A, E, R>. These aren’t arbitrary — they answer the three fundamental questions every computation must address:
- A — What do I produce when I succeed?
- E — What do I produce when I fail?
- R — What do I need in order to run?
Let’s examine each one.
A: The Answer
The A parameter is the success type — what you get back when everything goes right.
use effectful::{Effect, succeed};
// This effect produces an i32 on success
let answer: Effect<i32, String, ()> = succeed(42);
// This effect produces a User on success
let user_effect: Effect<User, DbError, ()> = succeed(User::new("Alice"));
If you’re familiar with Result<T, E>, think of A as the T. It’s what you’re hoping to get.
When you transform an effect with .map(), you’re changing the A:
let numbers: Effect<i32, String, ()> = succeed(21);
let doubled: Effect<i32, String, ()> = numbers.map(|n| n * 2);
let stringified: Effect<String, String, ()> = doubled.map(|n| n.to_string());
Each .map() transforms the success value while preserving the error type and requirements.
E: The Error
The E parameter is the failure type — what you get back when something goes wrong.
use effectful::{Effect, fail};
// This effect always fails with a String error
let failure: Effect<i32, String, ()> = fail("something went wrong".to_string());
// This effect can fail with a DbError
let user: Effect<User, DbError, ()> = fetch_user_from_db(42);
Again, if you know Result<T, E>, think of E as the E. It’s what you’re worried might happen.
You can transform error types with .map_error():
let db_effect: Effect<User, DbError, ()> = fetch_user(42);
// Convert DbError to a more general AppError
let app_effect: Effect<User, AppError, ()> = db_effect.map_error(|e| AppError::Database(e));
Unlike traditional error handling where you sprinkle .map_err() everywhere, with effects you typically handle error transformation at specific boundaries — when composing larger effects from smaller ones, or when exposing an API.
R: The Requirements
Here’s where effects get interesting. The R parameter represents the environment — the dependencies this effect needs in order to run.
// This effect needs nothing to run — R is ()
let standalone: Effect<i32, String, ()> = succeed(42);
// This effect needs a Database to run
fn get_user(id: u64) -> Effect<User, DbError, Database> {
// ... implementation that uses the database
}
// This effect needs both a Database AND a Logger
fn get_user_logged(id: u64) -> Effect<User, DbError, (Database, Logger)> {
// ... implementation that uses both
}
The key insight: you cannot run an effect unless you provide its requirements.
let needs_db: Effect<User, DbError, Database> = get_user(42);
// This won't compile! We haven't satisfied the Database requirement.
// run_blocking(needs_db); // ERROR: Database not provided
// We pass the required environment to the runner
let user = run_blocking(needs_db, my_database)?;
You can also capture an environment with .provide_env(my_database) to get an Effect<User, DbError, ()>.
Why R Matters
The R parameter is why effectful can offer compile-time dependency injection.
Consider this function signature:
fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger)>
Just from the type, you know:
- This produces a
Receipton success - It can fail with
OrderError - It requires four services to run
You don’t need to read the implementation. You don’t need to trace through function calls. The type tells you exactly what dependencies are involved.
And the compiler enforces it. If you try to run this effect without providing all four services, you get a compile error. No runtime “service not found” exceptions. No forgetting to initialize something.
R Flows Through Composition
When you combine effects, the bound effects must agree on one environment type. Use a shared environment type when multiple services are needed:
fn get_user(id: u64) -> Effect<User, DbError, Database> { ... }
fn send_email(to: &str, body: &str) -> Effect<(), EmailError, EmailService> { ... }
fn notify_user(id: u64) -> Effect<(), AppError, AppEnv> {
effect! {
let user = bind* get_user(id)
.zoom_env(|env: &mut AppEnv| env.database.clone())
.map_error(AppError::Db);
bind* send_email(&user.email, "Hello!")
.zoom_env(|env: &mut AppEnv| env.email.clone())
.map_error(AppError::Email);
}
}
The notify_user function now documents that callers must supply AppEnv, and each inner effect explicitly projects the part it needs.
The Unit Environment: ()
When R = (), the effect is self-contained. It doesn’t need anything from the outside world to run:
let standalone: Effect<i32, String, ()> = succeed(42);
// Can run immediately — no dependencies
let result = run_blocking(standalone, ());
Most effects start with requirements and gradually have them satisfied as you move toward the “edge” of your program:
// Deep in your code: many requirements
fn business_logic() -> Effect<Result, Error, (Db, Cache, Logger, Config)>
// At the edge: provide everything
fn main() {
let db = connect_database();
let cache = connect_cache();
let logger = setup_logger();
let config = load_config();
let env = AppEnv { db, cache, logger, config };
run_blocking(business_logic(), env);
}
Reading Effect Signatures
Let’s practice reading some signatures:
// Produces String, never fails, needs nothing
Effect<String, Never, ()>
// Produces i32, can fail with ParseError, needs nothing
Effect<i32, ParseError, ()>
// Produces User, can fail with DbError, needs Database
Effect<User, DbError, Database>
// Produces (), can fail with AppError, needs Database, Cache, and Logger
Effect<(), AppError, (Database, Cache, Logger)>
With practice, you’ll read these as fluently as you read Result<T, E>. The extra R parameter becomes second nature.
What’s Next
We’ve seen that effects are descriptions, not actions. We’ve seen that Effect<A, E, R> encodes success type, error type, and requirements.
But we haven’t answered the obvious question: why does this matter? Why is it better to describe computations than to just do them?
The answer is laziness. And laziness, it turns out, is a superpower.
Laziness as a Superpower
So far we’ve established that Effect<A, E, R> is a description of a computation — a recipe that does nothing until someone executes it. You might be thinking: “OK, but why is that good? I have to run it eventually. What do I gain by waiting?”
Quite a bit, if your program benefits from composing and testing before execution.
Here is what you can do with a computation you have not run yet.
Effect values vs driving an async fn
Rust futures are lazy: calling an async fn returns a Future; the body runs when that future is polled (for example with .await).
The contrast here is about what your API returns—a raw Future you must await immediately in the caller, versus an Effect value you can store, compose, and run later.
// Returns a Future; the HTTP work runs when this future is awaited / polled
async fn fetch_user_async(id: u64) -> Result<User, HttpError> {
http_get(&format!("https://api.example.com/users/{id}")).await
}
// Returns a description; I/O runs when the effect is executed with an environment
fn fetch_user(id: u64) -> Effect<User, HttpError, HttpClient> {
effect! {
let user = bind* http_get(&format!("https://api.example.com/users/{id}"));
user
}
}
Calling fetch_user_async(1) only builds the future; the request runs when something polls it (typically at .await). Calling fetch_user(1) returns an Effect—still no I/O until you run that effect with a runner and the needed HttpClient.
The point is not that async fn is “eager.” It is that effects give you a first-class value to combine (retries, timeouts, tests) before you commit to a particular run.
Superpower #1: Compose First, Run Later
Because effects are values, you can build an entire program before running any of it:
fn load_dashboard(user_id: u64) -> Effect<DashboardPage, AppError, (Database, Cache, Logger)> {
effect! {
let user = bind* fetch_user(user_id).map_error(AppError::Db);
let posts = bind* fetch_posts(user.id).map_error(AppError::Db);
let profile = bind* build_profile(&user, &posts).map_error(AppError::Render);
profile
}
}
// Nothing has run yet. We have a value.
let page = load_dashboard(42);
// Chain more work onto it — still nothing runs
let logged_page = page.flat_map(|p| log_view(p));
// Only now does any of this execute
run_blocking(logged_page, env);
Every line before run_blocking is pure data manipulation. You’re assembling a pipeline. The pipeline can be inspected, transformed, passed to other functions, stored in a struct. The laws of composition apply cleanly because there are no side-effects sneaking in.
Superpower #2: Retry Without Rewriting
Because an effect is a description, you can wrap it with new behavior without touching the original:
// Add exponential back-off retry — no changes to call_payment_api
let resilient = retry(
|| call_payment_api(order.clone()),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);
// Add more transformations around `resilient` as needed — still no changes
let bounded = resilient.map_error(PaymentError::RetryExhausted);
Compare this to the async version: to add retries to an async fn, you’d either modify the function body, wrap it in a helper that calls it in a loop, or reach for an external crate. The retry logic gets tangled with the business logic.
With effects, retry is just another description. retry takes a factory for one-shot effects and a Schedule, then returns a new effect. No surgery on the original operation required.
Superpower #3: Test Without Mocking the Universe
Because nothing runs until you provide the environment, tests can substitute controlled implementations without rewriting a single line of production code:
#[test]
fn user_not_found_returns_error() {
let test_env = TestEnv::new()
.with_http(stub_http_404_for("/users/99"));
let result = run_test(fetch_user(99), test_env);
assert!(matches!(result, Exit::Failure(Cause::Fail(HttpError::NotFound))));
}
The same fetch_user function used in production runs in the test — just against a different environment. No #[cfg(test)] stubs. No Arc<dyn Trait> that you only swap out in tests. The type system ensures you’ve provided every dependency the effect declared.
Sequential async vs bundled descriptions
Sequential async fn code is natural for linear flows: each .await advances the next step, and control matches the source order.
Effect-oriented APIs often bundle those steps into a single Effect value first, then apply cross-cutting behavior (retry, timeout, tracing) as transformations on that value before calling run_*.
That separation is useful when the same workflow must be reused under different policies or tested with a substituted environment, without copying the body of the async function.
When Does It Actually Run?
There are exactly three places where an Effect executes:
// In a binary or application entry point
run_blocking(program, env);
// In an async context
run_async(program, env).await;
// In tests
run_test(program, test_env);
Everywhere else, you’re building, transforming, or combining descriptions. The runtime boundary is explicit. You know exactly where the side-effects begin.
Until run_* is called, your effect is just data: composable and easy to substitute in tests.
That’s Chapter 1. You now have a picture of why teams adopt effects (errors, dependencies, concurrency structure), what an Effect is (a description executed with an environment), what the type parameters mean (A = success, E = failure, R = requirements), and why keeping work in description form matters for composition and testing.
Chapter 2 gets hands-on: first effects, map, flat_map, and a small end-to-end program.
Your First Effect
Chapter 1 was all philosophy. We established what effects are, why they exist, and why laziness is useful. Now we get our hands dirty.
By the end of this chapter you will have written real effects, transformed them, chained them together, and run a complete small program. You’ll use succeed, fail, map, map_error, and flat_map — the five operations that cover the vast majority of day-to-day effect work.
Let’s start with the simplest question: how do you create an effect in the first place?
Creating Effects — succeed, fail, and pure
Every effect starts as either a success or a failure. The two constructors that express this are succeed and fail.
succeed
succeed wraps a value into an effect that, when run, immediately produces that value:
use effectful::{Effect, succeed};
let answer: Effect<i32, String, ()> = succeed(42);
let greeting: Effect<String, String, ()> = succeed("Hello, world!".to_string());
Nothing happens when you call succeed. You get back a description — a lazy recipe that says “produce this value when someone asks.” The 42 is already there, but no computation has been executed.
The type parameters are important:
A = i32— the value we produceE = String— the error type (unused here, but we still have to pick one)R = ()— no environment needed
If you prefer the FP vocabulary, pure is an alias for succeed:
use effectful::pure;
let effect = pure(42_i32);
Both names refer to exactly the same thing. Use whichever feels natural in context.
fail
fail wraps an error into an effect that, when run, immediately fails with that error:
use effectful::{Effect, fail};
let oops: Effect<i32, String, ()> = fail("something went wrong".to_string());
Again, nothing executes. oops is a description of a failure, not the failure itself. You can pass it around, store it, and transform it without triggering any error handling.
The type annotation matters: Effect<i32, String, ()> says this would have produced an i32 on success — we just know it won’t.
From a Closure
For cases where you want to capture some computation in an effect (but still defer it):
use effectful::{Effect, effect};
let computed: Effect<i32, String, ()> = effect!(|_r: &mut ()| {
let x = expensive_calculation();
x * 2
});
The body of effect! runs lazily — only when the effect is executed. This is the workhorse macro we’ll cover thoroughly in Chapter 3.
Type Inference
Rust’s type inference often lets you skip the annotations:
// Types inferred from usage
let answer = succeed(42); // Effect<i32, _, ()>
let greeting = succeed("hi"); // Effect<&str, _, ()>
The error type E is usually inferred from how the effect is used later — when you chain it with other effects that can fail, the error type propagates. You’ll only need to annotate explicitly when the compiler asks.
Quick Reference
succeed(value) // Effect that produces value
pure(value) // Alias for succeed
fail(error) // Effect that fails with error
effect!(|_r| { … }) // Effect from a lazy closure
These three constructors cover every starting point. Everything else is transformation and composition.
Transforming Success — map and its Friends
You have an effect. It produces some value. But you want a different value — or a different error. That’s what map and map_error are for.
map
map transforms the success value without running any new effects:
use effectful::{succeed, Effect};
let number: Effect<i32, String, ()> = succeed(21);
let doubled: Effect<i32, String, ()> = number.map(|n| n * 2);
let text: Effect<String, String, ()> = doubled.map(|n| n.to_string());
None of these .map() calls executes anything. Each one wraps the previous description in a new layer: “and then transform the result with this function.” The chain of transformations only runs when you call run_blocking or similar.
The type of the effect changes with each map. The A parameter shifts:
// Effect<i32, String, ()>
// .map(|n: i32| n.to_string())
// → Effect<String, String, ()>
The E (error type) and R (requirements) stay the same. .map touches only the success path.
map_error
map_error transforms the failure type, leaving the success path untouched:
use effectful::fail;
#[derive(Debug)]
struct AppError(String);
let db_err: Effect<String, String, ()> = fail("db connection failed".to_string());
let app_err: Effect<String, AppError, ()> = db_err.map_error(|s| AppError(s));
This is typically used at module boundaries when you need to unify error types. A database layer might return DbError, but your application layer needs AppError. map_error does the conversion without touching anything else.
Why These Don’t Execute Anything
It’s worth repeating: neither map nor map_error runs any computation.
let effect = succeed(42)
.map(|n| { println!("mapping!"); n + 1 })
.map(|n| n * 2);
// At this point: nothing has printed, nothing has computed.
// We have a description of three steps.
let result = run_blocking(effect, ());
// NOW the effect runs. "mapping!" prints once. Result is 86.
This is the promise of laziness: you can build pipelines of transformations without triggering side effects until the moment you choose.
Combining map and map_error
A common pattern is calling both to normalise an effect into your domain’s types:
fn fetch_user_record(id: u64) -> Effect<User, AppError, ()> {
raw_db_fetch(id)
.map(|row| User::from_row(row))
.map_error(|e| AppError::Database(e))
}
The effect goes in with raw DB types; it comes out with domain types. The transformation chain documents the conversion at a glance.
and_then / and_then_discard
Two more helpers are worth knowing:
// and_then: sequence two effects, keep the second result
let validated: Effect<i32, String, ()> = succeed(42)
.and_then(succeed(100));
// and_then_discard: sequence two effects, keep the first result
let kept_left: Effect<i32, String, ()> = succeed(42)
.and_then_discard(succeed(()));
Use flat_map when the second effect depends on the first value.
Summary
| Method | Changes | Does not change |
|---|---|---|
.map(f) | A (success type) | E, R |
.map_error(f) | E (error type) | A, R |
.tap(f) | nothing | A, E, R |
None of them execute the effect. They all return new, larger descriptions.
Chaining Effects — flat_map and the Bind
map handles the case where your transformation is a pure function: A → B. But often the next step is itself an effect. You don’t want Effect<Effect<B, E, R>, E, R> — you want Effect<B, E, R>. That’s flat_map.
The Problem with map for Effects
Say you want to fetch a user and then fetch their posts:
fn get_user(id: u64) -> Effect<User, DbError, Database> { ... }
fn get_posts(user_id: u64) -> Effect<Vec<Post>, DbError, Database> { ... }
If you try to use map:
// This gives Effect<Effect<Vec<Post>, DbError, Database>, DbError, Database>
// — a nested effect, not what we want
let wrong = get_user(1).map(|user| get_posts(user.id));
map’s function must return a plain value. If it returns an Effect, you get nesting.
flat_map: Chain Without Nesting
flat_map (also known as and_then on effects) takes a function A → Effect<B, E, R> and “flattens” the result:
let combined: Effect<Vec<Post>, DbError, Database> =
get_user(1).flat_map(|user| get_posts(user.id));
Now you have one flat effect that, when run, first fetches the user, then uses the result to fetch posts. The nesting is gone.
Chaining Multiple Steps
flat_map chains read left-to-right, but deep chains get noisy:
// Gets unwieldy quickly
let program = get_user(1)
.flat_map(|user| get_posts(user.id)
.flat_map(|posts| render_page(user, posts)));
This is where the effect! macro comes in.
The effect! Macro as Syntactic Sugar
The effect! macro turns flat_map chains into readable sequential code using the bind* operator:
use effectful::effect;
let program: Effect<Page, AppError, Database> = effect! {
let user = bind* get_user(1).map_error(AppError::Db);
let posts = bind* get_posts(user.id).map_error(AppError::Db);
let page = render_page(user, posts);
page
};
The bind* operator is the bind: “run this effect and give me its success value.” Each bind* expr desugars to a flat_map. The whole block is one effect.
Note that render_page (a pure function with no bind*) is just a normal Rust expression — it runs inside the macro body during execution.
Error Short-Circuiting
Like ? in Result, if any bind* step fails, the whole effect! exits early with that error:
let program: Effect<Page, AppError, Database> = effect! {
let user = bind* get_user(999).map_error(AppError::Db);
// If get_user fails, execution stops here.
// The rest never runs.
let posts = bind* get_posts(user.id).map_error(AppError::Db);
render_page(user, posts)
};
This is sequential, not parallel. Each step waits for the previous.
map vs flat_map — When to Use Each
| Situation | Use |
|---|---|
| Transformation returns a plain value | .map(f) |
| Transformation returns an Effect | .flat_map(f) or effect! { bind* ... } |
| More than one sequential step | effect! { bind* ... } macro |
A rule of thumb: if you find yourself writing effect.map(|v| another_effect(v)) and noticing the nested type, switch to flat_map or the macro.
The Full Picture
// All equivalent:
// 1. Explicit flat_map
get_user(1)
.flat_map(|user| get_posts(user.id))
// 2. Using effect! with bind* effect! {
let user = bind* get_user(1);
bind* get_posts(user.id)
}
// 3. Short form for single bind
effect! { bind* get_user(1).flat_map(|u| get_posts(u.id)) }
The effect! macro is the idiomatic choice for anything more than one step. Chapter 3 covers it in full detail.
Your First Real Program
Let’s build something complete: a small program that loads configuration, connects to a database, queries a user, and formats a greeting. It’s simple enough to fit on one page, but real enough to demonstrate the full effect workflow.
The Domain
#[derive(Debug)]
struct Config {
db_url: String,
app_name: String,
}
#[derive(Debug)]
struct User {
id: u64,
name: String,
email: String,
}
#[derive(Debug)]
enum AppError {
Config(String),
Database(String),
}
The Individual Steps
Each step is a focused effect:
use effectful::{Effect, effect, succeed, fail};
fn load_config() -> Effect<Config, AppError, ()> {
// In a real app, read from a file or env vars
succeed(Config {
db_url: "postgres://localhost/myapp".to_string(),
app_name: "Greeter".to_string(),
})
}
fn connect_db(config: &Config) -> Effect<Database, AppError, ()> {
Database::connect(&config.db_url)
.map_error(|e| AppError::Database(format!("connect: {e}")))
}
fn fetch_user(db: &Database, id: u64) -> Effect<User, AppError, ()> {
db.query_user(id)
.map_error(|e| AppError::Database(format!("query: {e}")))
}
fn format_greeting(config: &Config, user: &User) -> String {
format!("{}: Hello, {}! ({})", config.app_name, user.name, user.email)
}
Composing the Program
Now we compose these steps into one effect using effect!:
fn greet_user(user_id: u64) -> Effect<String, AppError, ()> {
effect! {
let config = bind* load_config();
let db = bind* connect_db(&config);
let user = bind* fetch_user(&db, user_id);
format_greeting(&config, &user)
}
}
Read it like a recipe:
- Load config — if it fails, stop with
AppError::Config - Connect to DB — if it fails, stop with
AppError::Database - Fetch user — if it fails, stop with
AppError::Database - Format the greeting — this is pure, always succeeds
Nothing has run yet. greet_user(42) is a value.
Running It
At the edge of the program — in main — we execute:
fn main() {
match run_blocking(greet_user(42), ()) {
Ok(greeting) => println!("{greeting}"),
Err(AppError::Config(msg)) => eprintln!("Config error: {msg}"),
Err(AppError::Database(msg)) => eprintln!("DB error: {msg}"),
}
}
Testing It
Because the effect is a description, testing is straightforward — just swap out the underlying steps:
#[test]
fn test_greeting_format() {
let effect = effect! {
let config = bind* succeed(Config {
db_url: "unused".into(),
app_name: "TestApp".into(),
});
let user = bind* succeed(User {
id: 1,
name: "Alice".into(),
email: "alice@example.com".into(),
});
format_greeting(&config, &user)
};
let result = run_test(effect, ());
assert!(matches!(result, Exit::Success(greeting) if greeting == "TestApp: Hello, Alice! (alice@example.com)"));
}
No mocking framework. No Arc<dyn Trait> plumbing. Just substitute different succeed values for the steps you want to control.
What You Just Learned
You’ve written a complete effect-based program. Along the way you used:
succeedandfailto construct effects from values.mapand.map_errorto transform success and error typeseffect! { bind* ... }to sequence effects without callback nestingrun_blockingto execute at the program edgerun_testto verify behaviour in tests
That’s the core of 90% of what you’ll write day-to-day. The next two chapters go deeper: Chapter 3 explores the effect! macro in detail, and Chapter 4 begins the tour of R — the environment type that makes dependency injection a compile-time guarantee.
You just wrote your first effect-based program. It won’t be your last.
The effect! Macro — Do-Notation for Mortals
Chapter 2 introduced the effect! macro as “syntactic sugar for flat_map.” That’s technically accurate, but undersells it. In practice, effect! is how you write almost every multi-step computation in effectful.
This chapter covers the why, the how, and the limits of the macro. By the end you’ll be fluent in bind*, comfortable handling errors inside the macro, and clear on when not to use it.
Why Do-Notation Exists
Consider three steps that each depend on the previous result:
fn step_a() -> Effect<i32, Err, ()> { succeed(1) }
fn step_b(n: i32) -> Effect<i32, Err, ()> { succeed(n * 2) }
fn step_c(n: i32) -> Effect<String, Err, ()> { succeed(n.to_string()) }
Written with raw flat_map:
let program = step_a()
.flat_map(|a| step_b(a)
.flat_map(|b| step_c(b)));
Two steps: readable. Five steps: a pyramid. Ten steps: indistinguishable from callback hell.
Haskell solved this decades ago with do-notation. Scala’s for-comprehensions do the same thing. Rust doesn’t have built-in do-notation, so effectful provides it via a macro.
Do-Notation as a Concept
Do-notation lets you write sequential effectful code that looks like imperative code:
do
a ← step_a
b ← step_b(a)
c ← step_c(b)
return c
Each ← means “run this effect and bind its result to this name.” If any step fails, the whole computation short-circuits.
Rust can’t use the ← symbol, so effectful uses bind* (prefix bind-star):
effect! {
let a = bind* step_a();
let b = bind* step_b(a);
let c = bind* step_c(b);
c
}
Same semantics. Rust syntax. Zero nesting.
How the Desugaring Works
The macro transforms each bind* expr into a flat_map:
// Written:
effect! {
let a = bind* step_a();
let b = bind* step_b(a);
b.to_string()
}
// Roughly expands to:
step_a().flat_map(|a| {
step_b(a).flat_map(|b| {
succeed(b.to_string())
})
})
The macro generates exactly the nested flat_map chain you’d write by hand — just without the visual noise.
One Body, One block
One discipline matters: use one effect! block per function. Don’t branch between two macro bodies:
// BAD — two separate effect! blocks for one computation
if flag {
effect! { let x = bind* a(); x }
} else {
effect! { let y = bind* b(); y }
}
// GOOD — one block, branching inside
effect! {
if flag {
bind* a()
} else {
bind* b()
}
}
A single effect! block is a single description. Splitting it into multiple blocks loses the composition guarantee.
Pure Expressions
Not every line inside effect! has to be an effect. Pure Rust expressions work normally:
effect! {
let user = bind* fetch_user(id);
let name = user.name.to_uppercase(); // pure — no bind* let posts = bind* fetch_posts(user.id);
(name, posts)
}
Only use bind* when the expression has type Effect<_, _, _>. Pure expressions just run inline.
The bind* Operator Explained
The bind* (bind-star) is the bind operator inside effect!. It means: “execute this effect and give me its success value; if it fails, propagate the failure and stop.”
Basic Usage
effect! {
let user = bind* fetch_user(42); // bind the result to `user`
user.name
}
bind* fetch_user(42) desugars to a flat_map. The rest of the block becomes the body of the closure.
Discarding Results
When you don’t need the value, use bind* without a binding:
effect! {
bind* log_event("processing started"); // run for side effect, discard result
let result = bind* do_work();
bind* log_event("processing done");
result
}
Both bind* log_event(...) expressions run for their effects and the () return is discarded.
Method Calls on Effects
bind* works on any expression that evaluates to an Effect. That includes method chains:
effect! {
let user = bind* fetch_user(id).map_error(AppError::Database);
let posts = bind* retry(
|| fetch_posts(user.id).map_error(AppError::Database),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);
(user, posts)
}
The bind* applies to the entire expression that follows it. For retry/repeat, use the free functions that return an Effect.
bind* in Conditionals and Loops
You can use bind* inside if expressions and loops:
effect! {
let value = if condition {
bind* compute_a()
} else {
bind* compute_b()
};
process(value)
}
Both branches are effects; the macro handles either path.
effect! {
for id in user_ids {
bind* process_user(id); // sequential: one at a time
}
"done"
}
Note: this is sequential iteration. For concurrent processing, use fiber_all (Chapter 9).
What bind* Cannot Do
bind* only works inside an effect! block. Calling it outside is a compile error:
// Does not compile — bind* is not valid here
let x = bind* fetch_user(42);
// Must be inside effect!
let x = effect! { bind* fetch_user(42) };
Also, bind* cannot bind across an async closure boundary. If you’re calling from_async, the body of the async block is separate:
effect! {
let result = bind* from_async(|_r| async move {
// Inside here, you're in regular Rust async — no bind*.
let data = some_future().await?;
Ok(data)
});
result
}
Use bind* outside the async move block; use .await inside it.
The Old Postfix Syntax (Deprecated)
Early versions of effectful used a postfix bind-star: expr ~. This is no longer valid. Always use the prefix form:
// OLD — do not use
step_a() ~;
// GOOD
bind* step_a();
let x = bind* step_b();
If you see postfix bind-star in older code, update it to the prefix form.
Error Handling Inside effect!
The bind* operator short-circuits on failure — if a bound effect fails, the whole effect! block fails with that error. But you can also handle errors within the block.
The Default: Short-Circuit
effect! {
let a = bind* step_a(); // if this fails → whole block fails
let b = bind* step_b(a); // if this fails → whole block fails
b
}
This matches ? in Result. You get clean sequencing at the cost of aborting early. For most code, that’s exactly what you want.
Catching Errors Mid-block
To handle an error inline and continue, use .catch before the bind*:
effect! {
let user = bind* fetch_user(id).catch(|_| succeed(User::anonymous()));
// If fetch_user fails, we get User::anonymous() and continue
render_user(user)
}
.catch converts a failure into a success (or a different effect). The bind* then sees a successful effect.
Converting Errors with map_error
Often you have multiple effect types with different E parameters and need to unify them:
#[derive(Debug)]
enum AppError {
Db(DbError),
Network(HttpError),
}
effect! {
let user = bind* fetch_user(id).map_error(AppError::Db);
let data = bind* fetch_external_data(user.id).map_error(AppError::Network);
process(user, data)
}
Both effects are converted to the same AppError before binding. The block’s E parameter is AppError throughout.
Handling Errors as Values
The current Effect API does not expose a fold method. Use .catch / .catch_all, or run the effect and pattern match on Result at the boundary.
effect! {
let outcome = bind* risky_operation()
.map(|val| format!("Success: {val}"))
.catch_all(|err| format!("Error: {err}"));
log_outcome(outcome)
}
catch_all turns a typed failure into a fallback success value, so the resulting effect is infallible through E.
Re-raising Errors
Inside a .catch handler, you can inspect the error and decide whether to recover or re-fail:
effect! {
let result = bind* db_operation().catch(|error| {
if error.is_transient() {
// Transient: retry once with a fallback
fallback_db_operation()
} else {
// Permanent: re-raise
fail(error)
}
});
result
}
fail(error) inside a handler produces a failing effect — the outer bind* then propagates it.
Accumulating Multiple Errors
Short-circuit stops at the first error. The current root API does not include validate_all; collect independent validation errors manually at the boundary:
let mut errors = Vec::new();
if let Err(error) = run_blocking(validate_name(&input.name), ()) {
errors.push(error);
}
if let Err(error) = run_blocking(validate_email(&input.email), ()) {
errors.push(error);
}
if let Err(error) = run_blocking(validate_age(input.age), ()) {
errors.push(error);
}
Chapter 8 covers accumulation patterns in detail.
The Rule of Thumb
| Want | Do |
|---|---|
| Stop at first failure | plain bind* effect |
| Provide a fallback | `bind* effect.catch( |
| Unify error types | bind* effect.map_error(Into::into) |
| Turn failure into value | `bind* effect.catch_all( |
| Collect all failures | Manual accumulation outside the macro |
When Not to Use the Macro
effect! is the idiomatic choice for most multi-step computations. But it’s a macro — which means it has edges. Knowing when to reach for raw flat_map instead saves debugging time.
Use Raw flat_map for Single-Step Transforms
When there’s exactly one effectful step and you’re transforming its result, flat_map is cleaner:
// Unnecessarily verbose
effect! {
let id = bind* parse_id(raw);
id
}
// Clear and direct
parse_id(raw).flat_map(|id| succeed(id))
// or just:
parse_id(raw)
Use effect! when you have two or more sequential steps. For one, flat_map or .map is usually enough.
Use Combinators for Structural Patterns
Some patterns have named combinators that are more expressive than macros:
// Instead of:
effect! {
let a = bind* step_a();
let b = bind* step_b();
(a, b)
}
// Consider (when steps are independent):
step_a().zip(step_b())
zip communicates intent: “I need both, in any order.” The effect! version implies sequential dependency. For independent steps, prefer explicit combinators. (For concurrent independent steps, see fiber_all in Chapter 9.)
Avoid Deep Nesting Within the Block
The macro eliminates nesting between flat_map chains. But you can still create nested effect! blocks, which gets confusing:
// CONFUSING — nested macro bodies
effect! {
let result = bind* effect! { // inner macro
let x = bind* inner_step();
x * 2
};
result + 1
}
// BETTER — flatten it
effect! {
let x = bind* inner_step();
let result = x * 2;
result + 1
}
If you feel the urge to nest effect! inside effect!, flatten the outer block instead.
The Macro and Type Inference
The macro occasionally confuses the type inferencer, especially when the error type isn’t pinned early. If you see cryptic “can’t infer type” errors inside effect!:
- Annotate the return type of the enclosing function explicitly
- Add a
.map_error(Into::into)on the firstbind*binding to anchorE - As a last resort, break out the inner logic into a named helper function
When Generic Returns Are Needed
Library code with polymorphic A, E, R sometimes can’t use the macro cleanly:
// This works fine with explicit function + effect!
pub fn load_config<A, E, R>() -> Effect<A, E, R>
where
A: From<Config> + 'static,
E: From<ConfigError> + 'static,
R: 'static,
{
effect!(|_r: &mut R| {
let cfg = read_env_config()?;
A::from(cfg)
})
}
The closure form of effect! (with |_r: &mut R|) is the right tool for generic graph-builder functions. It’s still the macro, just in its raw form.
Summary
| Situation | Prefer |
|---|---|
| 2+ sequential steps | effect! { bind* ... } |
| 1 step, simple transform | .map / .flat_map |
| Independent steps | .zip / combinators |
Generic <A, E, R> graph builder | `effect!( |
| Structural patterns (zip, race, all) | explicit combinators, not macro |
The macro is a tool, not a religion. Use it when it makes the code read like a story; use combinators when they express intent more directly.
The R Parameter — Your Dependencies, Encoded in Types
Chapter 1 introduced R as “what an effect needs to run.” We kept it vague on purpose — you needed to understand effects before worrying about their environment.
Now it’s time to understand R properly. This chapter answers: what is R mechanically, how does it flow through composition, and how do you satisfy it?
The payoff is significant. Once you internalize R, compile-time dependency injection stops feeling like magic and starts feeling obvious.
R Revisited — More Than Just a Type Parameter
R is the environment type required to run an effect.
fn get_user(id: u64) -> Effect<User, DbError, Database>
This says: to run get_user, supply a Database environment.
R as a Contract
let effect = get_user(1);
// Missing environment: does not match current runner API.
// run_blocking(effect);
let user = run_blocking(effect, my_database)?;
You can also capture the environment first.
let ready = get_user(1).provide_env(my_database);
let user = run_blocking(ready, ())?;
Composition Requires One Environment Type
Inside one effect! block, bound effects must agree on the same R type after any adaptations.
fn get_user(id: u64) -> Effect<User, DbError, Database> { /* ... */ }
fn get_posts(user_id: u64) -> Effect<Vec<Post>, DbError, Database> { /* ... */ }
fn get_user_with_posts(id: u64) -> Effect<(User, Vec<Post>), DbError, Database> {
effect! {
let user = bind* get_user(id);
let posts = bind* get_posts(user.id);
(user, posts)
}
}
When effects need different environment types, adapt them with zoom_env, use a common ServiceContext, or build an HList Context that exposes both services.
fn log(msg: &str) -> Effect<(), LogError, Logger> { /* ... */ }
fn get_user(id: u64) -> Effect<User, DbError, Database> { /* ... */ }
fn get_user_logged(id: u64) -> Effect<User, AppError, AppEnv> {
effect! {
bind* log(&format!("Fetching user {id}"))
.zoom_env(|env: &mut AppEnv| env.logger.clone())
.map_error(AppError::Log);
bind* get_user(id)
.zoom_env(|env: &mut AppEnv| env.database.clone())
.map_error(AppError::Db)
}
}
R as Documentation
Effect<Receipt, AppError, ServiceContext> says the computation reads services from a runtime service table. Effect<Receipt, AppError, Context<...>> says exactly which tagged HList cells are required. A concrete environment type like AppEnv documents which aggregate application environment must be supplied.
Why R Instead of Parameters?
Traditional Rust threads dependencies as function parameters. R moves that dependency requirement into the returned effect type, which makes it composable and delayable until the program edge.
The key rule: library functions return honest Effect<A, E, R> values; application edges decide which concrete environment to pass.
Providing Dependencies
An effect with a non-() R needs an environment before it can run. The current low-level API is explicit: pass the environment to the runner, or capture it with provide_env.
Run with an Environment
fn get_user(id: u64) -> Effect<User, DbError, Database> { /* ... */ }
let effect = get_user(42);
let user = run_blocking(effect, my_database)?;
run_blocking(effect, env) and run_async(effect, env) consume both the effect and the environment.
Capture an Environment
Use provide_env when you want to turn Effect<A, E, R> into Effect<A, E, ()> before running or composing at the edge.
let ready: Effect<User, DbError, ()> = get_user(42).provide_env(my_database);
let user = run_blocking(ready, ())?;
There is no raw .provide(value) / .provide_some(value) API in the current core effect surface.
ServiceContext and Layers
For derive-based services, effects can require ServiceContext and be provided with a Layer.
#[derive(Clone, Service)]
struct Database { /* ... */ }
fn get_user(id: u64) -> Effect<User, AppError, ServiceContext> {
Effect::<Database, AppError, ServiceContext>::service::<Database>()
.flat_map(move |db| db.get_user(id))
}
let layer = Layer::succeed(Database::new());
let user = run_blocking(get_user(42).provide(layer), ())?;
Effect::provide(layer) exists for Effect<_, _, ServiceContext>.
Tagged Contexts
For HList-style typed contexts, construct a Context and pass it as R.
service_key!(pub struct DbKey);
let env = service_env::<DbKey, _>(my_database);
let user = run_blocking(get_user(42), env)?;
Use effect.provide_head(value) when the effect’s environment is Context<Cons<Service<K, V>, Tail>> and you want to provide the head cell.
Program Edge Rule
Provide dependencies at the program edge: main, request adapters, integration tests, and top-level supervisors. Library functions should return effects that honestly describe their required R.
Widening and Narrowing — Environment Transformations
Sometimes your effect needs part of an environment, but the caller has the whole thing. This is where zoom_env comes in.
The Mismatch Problem
Imagine your application has a large environment type:
struct AppEnv {
db: Database,
logger: Logger,
config: Config,
metrics: MetricsClient,
}
You have a utility function that only needs a Logger:
fn log_event(msg: &str) -> Effect<(), LogError, Logger> { ... }
You can’t call this inside an effect! block that has AppEnv in scope — the types don’t match. You need to narrow the environment down.
zoom_env: Narrow the Environment
zoom_env adapts an effect to work with a larger environment by providing a lens from the larger type to the smaller one:
// Adapt log_event to work with AppEnv.
// The projection returns the smaller environment by value.
let app_log = log_event("hello").zoom_env(|env: &mut AppEnv| env.logger.clone());
Now app_log has type Effect<(), LogError, AppEnv>. The function extracts the Logger from AppEnv and feeds it to the original effect.
Inside effect!, the pattern looks like:
fn process(data: Data) -> Effect<(), AppError, AppEnv> {
effect! {
bind* log_event("start").zoom_env(|e: &mut AppEnv| e.logger.clone()).map_error(AppError::Log);
bind* db_query(data).zoom_env(|e: &mut AppEnv| e.db.clone()).map_error(AppError::Db);
}
}
Transforming the Environment
The current API uses zoom_env for both narrowing and transformation. It applies a function to convert the caller’s environment into what the effect needs:
// Effect needs a raw string URL
fn connect_raw() -> Effect<Database, DbError, String> { ... }
// You have a Config that contains the URL
let with_config = connect_raw().zoom_env(|cfg: &mut Config| cfg.db_url.clone());
// Now type is Effect<Database, DbError, Config>
There is no separate contramap_env method in the current API.
R as Documentation Revisited
These combinators highlight why R is valuable as documentation. When you see:
fn log_event(msg: &str) -> Effect<(), LogError, Logger>
You know exactly what this function needs. You don’t need to read its body to see if it also touches the database. The zoom_env call at the use site makes the adaptation explicit — it’s not hidden.
Compare to the pre-effect alternative:
// Traditional: you'd need to read the body to know what `env` is used for
fn log_event(env: &AppEnv, msg: &str) -> Result<(), LogError> { ... }
With R, the function declares what it needs. With zoom_env, the caller declares how to satisfy it.
When to Use These
In practice, zoom_env appears most often in library code — when writing reusable utilities that should work with any environment containing the right piece. Application code often uses ServiceContext, Context, and Layers instead.
Think of zoom_env as the manual fallback when the automatic layer-based wiring isn’t the right fit.
R as Documentation — Self-Describing Functions
The R parameter is often described as “the environment type.” That’s true, but it undersells the practical benefit. R is living documentation that the compiler enforces.
The Signature Tells the Story
Consider two versions of the same function:
// Version A: traditional async
async fn process_order(order: Order) -> Result<Receipt, Error> {
// What does this use? Read the body to find out.
// Database? PaymentGateway? Email? Metrics?
// You'll have to trace through 200 lines to know.
}
// Version B: effect-based
fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger)> {
// What does this use? Look at the signature.
// Database ✓, PaymentGateway ✓, EmailService ✓, Logger ✓
// Done.
}
Version B’s type is self-describing. You don’t need to read the implementation to understand its dependency surface.
Code Review Benefits
In a pull request, R changes are visible in the diff. If someone adds a call to send_metrics() inside process_order and the MetricsClient wasn’t previously in R, the function signature must change:
- fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger)>
+ fn process_order(order: Order) -> Effect<Receipt, OrderError, (Database, PaymentGateway, EmailService, Logger, MetricsClient)>
This diff is in the function signature — impossible to miss. With traditional parameters or singletons, new dependencies can silently appear in implementation bodies.
Refactoring Safety
When you refactor and remove a dependency, the R type shrinks. Callers that construct a concrete environment may need to simplify that environment too.
// After removing Logger from process_order:
// Before: process_order required AppEnv { db, logger }
let result = run_blocking(process_order(order), AppEnv { db, logger })?;
// After: process_order only requires Database
let result = run_blocking(process_order(order), db)?;
The compiler guides the cleanup when the environment type changes. Traditional singleton-style code can leave stale dependencies silently lingering.
Testing Clarity
When writing a test, R tells you exactly what you need to mock:
#[test]
fn test_process_order() {
// R = (Database, PaymentGateway, EmailService, Logger)
// So the test needs these four — no more, no less
let result = run_test(
process_order(test_order()),
(mock_db(), mock_payment(), mock_email(), test_logger()),
);
assert!(matches!(result, Exit::Success(_)));
}
There’s no “I wonder if this also touches the metrics service” uncertainty. The type says it doesn’t. If you’re missing a mock, the code won’t compile.
R is Not Magic
It’s important to understand that R is just a type parameter. The “compile-time DI” property comes from:
- Functions declaring what they need in
R - Runners requiring an actual environment value of type
R - Composition preserving environment requirements in the resulting effect type
There’s no reflection, no registration, no framework. Just types.
The next chapter shows how Tags and Context make this scale beyond simple tuples — handling large, complex dependency graphs without positional ambiguity.
Tags and Context — Compile-Time Service Lookup
Chapter 4 showed how R encodes dependencies as types. We used simple types like Database and Logger. That works for small programs, but breaks down as the dependency graph grows.
This chapter introduces the solution: Tags. Tags give values compile-time identities, and Context assembles them into a heterogeneous list that the compiler can query by name, not by position.
By the end you’ll understand why effectful uses this structure instead of tuples, and how to extract exactly the service you need from any environment.
The Problem with Positional Types
If you’ve used tuples as R for a while, you’ve probably already hit the wall. Let’s make it explicit.
The Tuple Explosion
Two dependencies: perfectly readable.
Effect<A, E, (Database, Logger)>
Five dependencies: which is which?
Effect<A, E, (Pool, Pool, Logger, Config, HttpClient)>
// ^^^^ two Pools — which is the main DB and which is the cache?
Tuples are positional. (Pool, Pool, ...) is ambiguous — both fields have the same type. There’s no way to distinguish them except by index, and index-based access is error-prone and breaks silently when you reorder the tuple.
The Fragility Problem
Positional types are fragile under change. Say your function started with:
fn foo() -> Effect<A, E, (Database, Logger)>
// 0 1
Now a teammate adds Config between them:
fn foo() -> Effect<A, E, (Database, Config, Logger)>
// 0 1 2
Every caller that was providing a tuple (db, log) must be updated to (db, config, log). The change in position is invisible to the type system — the compiler won’t tell you where the old index references are. It’s a silent bomb.
The Same-Type Collision
The deeper problem: Rust can’t distinguish Pool for the main database from Pool for the cache. They’re the same type. Positional tuples just accept both:
// V1: run with (main_pool, cache_pool)
// V2: accidentally swap them
run_blocking(effect, (cache_pool, main_pool)) // compiles, wrong at runtime
No compile error. Wrong behaviour. Possibly wrong for months before you notice.
What We Actually Need
We need a way to give each dependency a name — a compile-time identifier that’s independent of its type and its position in any list.
What if:
Databasemeant “the tagged Pool known as DatabaseTag”Cachemeant “the tagged Pool known as CacheTag”
Then you couldn’t accidentally swap them — they’d be different types even though both are Pool underneath.
That’s exactly what Tags provide.
Tags — Branding Values with Identity
A tag is a compile-time key. Tagged<K, V> stores a value V under key type K, so two values with the same runtime type can remain distinct in a typed context.
Tag and Tagged
use effectful::{Tagged, tagged};
struct DatabaseTag;
struct CacheTag;
let db: Tagged<DatabaseTag, Pool> = tagged::<DatabaseTag, _>(connect_database());
let cache: Tagged<CacheTag, Pool> = tagged::<CacheTag, _>(connect_cache());
let pool_ref: &Pool = &db.value;
Tag<K> itself is a zero-sized phantom identity. Most code works with Tagged<K, V> values rather than constructing Tag<K> directly.
Why Tags Help
Tagged<DatabaseTag, Pool> and Tagged<CacheTag, Pool> are different types even though both contain Pool. That prevents positional swaps.
fn needs_database(db: Tagged<DatabaseTag, Pool>) { /* ... */ }
let cache: Tagged<CacheTag, Pool> = tagged::<CacheTag, _>(pool);
needs_database(cache); // type error
service_key!
The legacy service_key! macro declares a nominal key type.
use effectful::service_key;
service_key!(pub struct DatabaseKey);
service_key!(pub struct CacheKey);
Pair the key with a value using Service<K, V> / Tagged<K, V>.
type DatabaseService = effectful::Service<DatabaseKey, Pool>;
let db = effectful::service::<DatabaseKey, _>(pool);
For new service-style code, prefer #[derive(Service)] on the service struct and ServiceContext.
NeedsX Supertraits
For HList contexts, a NeedsX trait can name a Get bound.
pub trait NeedsDatabase: Get<DatabaseKey, Target = Pool> {}
impl<R: Get<DatabaseKey, Target = Pool>> NeedsDatabase for R {}
pub fn get_user<R: NeedsDatabase>(id: u64) -> Effect<User, DbError, R> { /* ... */ }
This is a readability pattern, not a separate runtime feature.
Summary
| Concept | Purpose |
|---|---|
Tag<K> | Zero-sized key identity |
Tagged<K, V> | Value V stored under key K |
tagged::<K, _>(v) | Construct a tagged cell |
service_key!(pub struct K); | Declare a nominal key type |
Service<K, V> | Alias for Tagged<K, V> |
Context and HLists — The Heterogeneous Stack
Context<L> wraps a heterogeneous list of typed cells. In the tagged API, each cell is usually Tagged<K, V>.
The Structure: Cons / Nil
use effectful::{Cons, Nil, Tagged};
type Empty = Nil;
type WithDb = Cons<Tagged<DatabaseKey, Pool>, Nil>;
type WithDbAndLogger = Cons<Tagged<DatabaseKey, Pool>, Cons<Tagged<LoggerKey, Logger>, Nil>>;
Cons<Head, Tail> prepends one item to a list. Nil is the empty list.
Building Context Values
use effectful::{Cons, Context, Nil, tagged};
let env = Context::new(Cons(
tagged::<DatabaseKey, _>(my_pool),
Cons(tagged::<LoggerKey, _>(my_logger), Nil),
));
You can also prepend to an existing context:
let base = Context::new(Cons(tagged::<LoggerKey, _>(my_logger), Nil));
let full = base.prepend(tagged::<DatabaseKey, _>(my_pool));
Access
Context::get::<K>() reads the head cell when it has key K.
let pool: &Pool = full.get::<DatabaseKey>();
For non-head cells, use get_path::<K, P>() with an explicit type-level path such as ThereHere / Skip1.
let logger: &Logger = full.get_path::<LoggerKey, ThereHere>();
This explicit path is why application code often wraps bounds in NeedsX traits or uses ServiceContext instead.
Why HLists and Not HashMap?
An HList preserves each value’s type in the environment type. That gives compile-time lookup and no runtime downcast. The cost is verbose types like Cons<Tagged<A, V>, Cons<Tagged<B, W>, Nil>>.
Use HList Context when you want maximum static structure. Use ServiceContext when you want a simpler runtime service table keyed by service type.
Converting Context to ServiceContext
At the composition root you may have a statically-typed Context but need to hand it to code that expects ServiceContext. Use [IntoServiceContext]:
use effectful::{ctx, Effect, IntoServiceContext, MissingService, Service, ServiceContext,
run_blocking};
#[derive(Clone, Hash, Service)]
struct Config { port: u16 }
let static_ctx = ctx!(Config => Config { port: 8080 });
let runtime_ctx: ServiceContext = static_ctx.into_service_context();
let program: Effect<u16, MissingService, ServiceContext> =
Config::use_sync(|config| config.port);
assert_eq!(run_blocking(program, runtime_ctx), Ok(8080));
Only self-keyed service cells (Tagged<S, S> where S implements [Service]) convert. Arbitrary tagged values cannot silently enter runtime service lookup. Duplicate service types in one list make the head cell win, matching compile-time lookup intuition.
When to Use Which
| Situation | Use |
|---|---|
| Fixed compile-time HList; path-sensitive internals | Context |
| Derive-service app / layer code; runtime wiring | ServiceContext |
| Bridging the two at the composition root | ctx! → .into_service_context() |
Get and GetMut — Extracting from Context
Get and GetMut are type-level lookup traits for HList contexts.
Get: Read-Only Access
use effectful::{Get, Here};
fn use_database<R>(env: &R) -> &Pool
where
R: Get<DatabaseKey, Here, Target = Pool>,
{
env.get()
}
The path parameter tells the compiler where the cell lives. Here means the head of the list.
GetMut: Mutable Access
use effectful::{GetMut, Here};
fn increment_counter<R>(env: &mut R)
where
R: GetMut<CounterKey, Here, Target = Counter>,
{
let counter = env.get_mut();
counter.increment();
}
Use GetMut sparingly. For shared concurrent state, prefer TRef or a service that owns its mutation rules.
Explicit Paths
For values not at the head, use a path such as ThereHere or Skip1.
let logger: &Logger = env.get_path::<LoggerKey, ThereHere>();
Lookup is type-safe, but not magical: the path is part of the bound.
No bind* Tag Shorthand
The current effect! macro only binds expressions whose type implements the bind protocol, usually Effect<_, _, _>.
effect! {
let user = bind* fetch_user(id);
user
}
It does not support bind* DatabaseKey service lookup shorthand. Access tagged contexts with Context::get / Get, or use derive-service helpers such as UserRepository::use_.
NeedsX Supertraits
You can hide verbose Get bounds behind a trait.
pub trait NeedsDatabase: Get<DatabaseKey, Here, Target = Pool> {}
impl<R: Get<DatabaseKey, Here, Target = Pool>> NeedsDatabase for R {}
This is only a naming pattern; the compiler still verifies the underlying Get bound.
Compile-Time Guarantees
If a function requires NeedsDatabase, callers must supply an environment type that satisfies that trait. Missing tagged cells are compile-time errors for HList Contexts.
Layers — Building Your Dependency Graph
You’ve seen how R encodes what an effect needs, and how Context holds the values at runtime. But who builds the context?
In small programs you can construct context manually with ctx! and hand values to provide. In real applications, you need something more powerful: a way to declare how to build each piece of the environment, with automatic dependency ordering and lifecycle management.
That’s what Layers are for.
A Layer is a recipe for building part of an environment. It knows what it produces, what it needs to produce it, and (optionally) how to clean up afterward. Wire Layers together, and effectful figures out the right build order automatically.
What Is a Layer?
An Effect describes a computation that needs an environment. A layer describes how to build values or services that can become that environment.
Two Layer Surfaces
effectful currently exposes two related layer APIs.
LayerFn / LayerBuild
Builds typed values, often `Tagged<K, V>` cells for HList `Context`s.
Layer<ROut, E, RIn>
Builds services into `ServiceContext` for derive-service applications.
HList-Style Layer
use effectful::{LayerBuild, LayerFn, tagged};
let db_layer = LayerFn(|| {
let pool = connect_pool_blocking(database_url)?;
Ok(tagged::<DatabaseKey, _>(pool))
});
let db_cell = db_layer.build()?;
LayerFn is lazy: construction does nothing; build() runs the constructor.
ServiceContext Layer
use effectful::{Layer, Service};
#[derive(Clone, Service)]
struct Database { /* ... */ }
let db_layer = Layer::succeed(Database::new());
let context = run_blocking(db_layer.build(), ())?;
For dependencies, use Layer::effect and read upstream services from ServiceContext.
let db_layer = Layer::effect("Database", || {
Config::use_(|config| Database::connect(config.database_url))
});
Providing to Effects
Effect::provide(layer) exists for Effect<_, _, ServiceContext>.
let result = run_blocking(my_app().provide(app_layer), ())?;
For typed Context environments, build the context and pass it directly to run_blocking(effect, env).
Lifecycle
Resource lifecycles are handled by Scope, Pool, and explicit finalizers. Layers are constructors; if a layer creates resources that require cleanup, make that cleanup part of the service design or the surrounding scope.
Building Layers — From Simple to Complex
effectful currently has two layer surfaces:
LayerFn/LayerBuildfor typed HList-style values.Layer<ROut, E, RIn>forServiceContextservices.
LayerFn
LayerFn wraps a zero-argument function returning Result<Output, Error>.
use effectful::{LayerBuild, LayerFn, tagged};
let config_layer = LayerFn(|| Ok(tagged::<ConfigKey, _>(Config::from_env()?)));
let config = config_layer.build()?;
Use LayerEffect::new(effect) when the output comes from an Effect and should be cached after first build.
use effectful::LayerEffect;
let db_layer = LayerEffect::new(connect_pool(config.db_url()).map(|pool| tagged::<DatabaseKey, _>(pool)));
let db = db_layer.build()?;
LayerFnFrom
Use LayerFnFrom when a layer depends on the output of a previous layer.
use effectful::{LayerFn, LayerFnFrom, LayerExt};
let config = LayerFn(|| Ok(Config::from_env()?));
let db = LayerFnFrom(|config: &Config| Ok(Database::connect(config.db_url())?));
let stack = config.and_then(db);
let env = stack.build()?;
ServiceContext Layers
For derive-based services, use Layer::succeed or Layer::effect.
#[derive(Clone, Service)]
struct Config { /* ... */ }
let config_layer = Layer::succeed(Config::from_env());
let db_layer = Layer::effect("Database", || {
Config::use_(|config| Database::connect(config.database_url))
});
Layer::effect receives dependencies through ServiceContext, so it can use Effect::service::<S>() or S::use_.
Memoization
Layer<ROut, E, RIn> exposes .memoized().
let shared_config = config_layer.memoized();
LayerEffect in the HList surface caches by design after the first build.
Stacking Layers — Composition Patterns
Layer composition is explicit. Use Stack / and_then for HList-style layers, and merge / provide / provide_merge for ServiceContext layers.
Independent HList Layers
use effectful::{LayerBuild, Stack};
let stack = Stack(config_layer, logger_layer);
let env = stack.build()?; // Cons<ConfigOutput, Cons<LoggerOutput, Nil>>
The two layers must have the same error type.
Dependent HList Layers
Use LayerExt::and_then or StackThen when the second layer needs the first layer’s output.
use effectful::{LayerBuild, LayerExt, LayerFn, LayerFnFrom};
let config = LayerFn(|| Ok(Config::from_env()?));
let db = LayerFnFrom(|config: &Config| Ok(Database::connect(config.db_url())?));
let env = config.and_then(db).build()?;
ServiceContext merge
For derive-service layers, merge builds independent layers and combines their ServiceContexts.
let app_layer = config_layer.merge(logger_layer);
let context = run_blocking(app_layer.build(), ())?;
ServiceContext provide
Use provide when one layer needs services from another layer and you want to hide provider output.
let db_with_config = db_layer.provide(config_layer);
let context = run_blocking(db_with_config.build(), ())?;
Use provide_merge when you want both provider and dependent services in the final context.
let app_layer = db_layer.provide_merge(config_layer);
Providing to Effects
Effect::provide(layer) exists for effects whose environment is ServiceContext.
let result = run_blocking(my_application().provide(app_layer), ())?;
For typed Context / HList environments, build the context and pass it to run_blocking(effect, env).
Layer Graphs — Dependency Planning
LayerGraph is a planner for named layer nodes. It does not build layers itself; it computes a topological build order from requires and provides service names.
Declaring a Layer Graph
use effectful::{LayerGraph, LayerNode};
let graph = LayerGraph::new([
LayerNode::new("config", std::iter::empty::<&str>(), ["config"]),
LayerNode::new("db", ["config"], ["db"]),
LayerNode::new("cache", ["config"], ["cache"]),
LayerNode::new("service", ["db", "cache"], ["service"]),
]);
Each node has:
id: stable unique node idrequires: service names it needsprovides: service names it supplies
Planning
let plan = graph.plan_topological()?;
assert_eq!(plan.build_order, vec!["config", "db", "cache", "service"]);
Sibling order is deterministic but should not be used as a semantic dependency. If one layer must precede another, express that with requires / provides.
Planner Errors
plan_topological can fail with:
| Error | Meaning |
|---|---|
DuplicateNodeId | Two nodes share an id |
ConflictingProvider | More than one node provides the same service name |
MissingProvider | A requirement has no provider |
CycleDetected | Dependencies contain a cycle |
let bad_graph = LayerGraph::new([
LayerNode::new("a", ["b"], ["a"]),
LayerNode::new("b", ["a"], ["b"]),
]);
let err = bad_graph.plan_topological();
assert!(matches!(err, Err(LayerPlannerError::CycleDetected { .. })));
Use error.to_diagnostic() for user-facing messages and suggestions.
Planning from STM
LayerGraph::plan_topological_from_tref(&nodes_tref) reads a TRef<Vec<LayerNode>> snapshot transactionally and plans from that snapshot.
When to Use LayerGraph
Use LayerGraph when you need validation, diagnostics, or tool-visible dependency order. For a few layers in application code, direct layer composition is usually clearer.
Services — The Complete DI Pattern
The previous chapters established the building blocks: Tags (identities), Context (the environment), and Layers (constructors). Now we put them together into the complete Service pattern.
A Service in effectful is the combination of:
- A trait defining the interface
- A tag identifying it in the environment
- One or more implementations (production and test)
- A layer that wires an implementation into the environment
This is the full dependency injection story. By the end of this chapter you’ll have a working multi-service application wired entirely at compile time.
Service Traits — Defining Interfaces
Services can be modeled as cloneable structs that contain concrete clients, trait objects, or function handles. Derive Service on the struct to make it available through ServiceContext.
Define the Interface
trait UserRepositoryImpl: Send + Sync {
fn get_user(&self, id: u64) -> Effect<User, DbError, ()>;
fn save_user(&self, user: User) -> Effect<(), DbError, ()>;
}
#[derive(Clone, Service)]
struct UserRepository {
inner: Arc<dyn UserRepositoryImpl>,
}
impl UserRepository {
fn get_user(&self, id: u64) -> Effect<User, DbError, ()> {
self.inner.get_user(id)
}
}
The service struct is the lookup key. It must be Clone + 'static.
Accessing a Service
Use Effect::service::<S>(), S::use_, or S::use_sync.
fn get_user_profile(id: u64) -> Effect<UserProfile, AppError, ServiceContext> {
UserRepository::use_(move |repo| {
repo.get_user(id)
.map(UserProfile::from)
.map_error(AppError::Db)
})
}
If the service is missing, the lookup fails with MissingService; include From<MissingService> in your application error when using use_ / Effect::service.
Tagged Alternative
The older typed-context style uses service_key!(pub struct Key); plus Tagged<Key, V> / Service<Key, V>.
service_key!(pub struct UserRepositoryKey);
type UserRepositoryService = Service<UserRepositoryKey, Arc<dyn UserRepositoryImpl>>;
Use this when you specifically want HList Context types in function signatures. Prefer derive-service for application service tables.
Keeping Services Focused
Avoid a single god service. Split by capability.
#[derive(Clone, Service)]
struct Users { /* ... */ }
#[derive(Clone, Service)]
struct Mailer { /* ... */ }
#[derive(Clone, Service)]
struct Payments { /* ... */ }
Functions should read exactly the services they need from ServiceContext.
ServiceEnv and service_env
ServiceEnv<K, V> is a one-cell tagged Context. service_env::<K, V>(value) constructs that context; it is not an accessor effect.
Constructing a Tagged Service Environment
use effectful::{ServiceEnv, service_env, service_key};
service_key!(pub struct UserRepositoryKey);
let env: ServiceEnv<UserRepositoryKey, UserRepository> =
service_env::<UserRepositoryKey, _>(repo);
The alias is:
type ServiceEnv<K, V> = Context<Cons<Service<K, V>, Nil>>;
Accessing Tagged Services
Use Context::get::<K>() or Get<K> bounds.
fn get_user(id: u64) -> Effect<User, DbError, ServiceEnv<UserRepositoryKey, UserRepository>> {
Effect::new(move |env| {
let repo = env.get::<UserRepositoryKey>();
repo.get_user_blocking(id)
})
}
Inside effect!, there is no bind* UserRepositoryKey shorthand in the current API.
Derive-Service Alternative
For most service code, prefer the #[derive(Service)] / ServiceContext API.
#[derive(Clone, Service)]
struct UserRepository { /* ... */ }
fn get_user(id: u64) -> Effect<User, AppError, ServiceContext> {
UserRepository::use_(move |repo| repo.get_user(id))
}
let env = UserRepository::new().to_context();
let user = run_blocking(get_user(42), env)?;
This avoids exposing HList paths in most application signatures.
Interop: compile-time Context → ServiceContext
When you have a statically-built Context and need to feed it into an effect that requires ServiceContext, convert at the edge:
use effectful::{ctx, Effect, IntoServiceContext, MissingService, Service, ServiceContext,
run_blocking};
#[derive(Clone, Hash, Service)]
struct Config { port: u16 }
let static_ctx = ctx!(Config => Config { port: 8080 });
let env: ServiceContext = static_ctx.into_service_context();
let program: Effect<u16, MissingService, ServiceContext> =
Config::use_sync(|config| config.port);
assert_eq!(run_blocking(program, env), Ok(8080));
This is the recommended path: build with ctx!, convert with .into_service_context(), then look up services with Service::use_sync or Effect::service.
Providing Services via Layers
The derive-service API makes a service type both key and value. Provide concrete services with Layer::succeed or construct them with Layer::effect.
Minimal Service Layer
use effectful::{Layer, Service};
#[derive(Clone, Service)]
struct UserRepository {
pool: Pool,
}
let repo_layer = Layer::succeed(UserRepository { pool });
An effect can read the service from ServiceContext.
fn get_user(id: u64) -> Effect<User, AppError, ServiceContext> {
UserRepository::use_(move |repo| repo.get_user(id))
}
Run by providing the layer at the edge.
let user = run_blocking(get_user(42).provide(repo_layer), ())?;
Layer with Dependencies
Layer::effect builds a service by running an effect in ServiceContext.
#[derive(Clone, Service)]
struct Config { database_url: String }
#[derive(Clone, Service)]
struct Database { /* ... */ }
let config_layer = Layer::succeed(Config::from_env());
let db_layer = Layer::effect("Database", || {
Config::use_(|config| Database::connect(config.database_url))
});
let app_layer = db_layer.provide_merge(config_layer);
provide hides provider services in the final output. provide_merge keeps them.
Test Layer with Mock
Tests provide the same service type with a fake implementation.
#[derive(Clone, Service)]
struct UserRepository {
users: Arc<HashMap<u64, User>>,
}
let test_layer = Layer::succeed(UserRepository::from_users([alice(), bob()]));
let exit = run_test(get_user(1).provide(test_layer), ());
Application code does not change; only the layer at the edge changes.
A Complete DI Example — Putting It Together
This example uses the current derive-service / ServiceContext layer API.
Domain
struct User { id: u64, name: String, email: String }
struct Post { id: u64, author_id: u64, title: String, body: String }
enum AppError { Db(DbError), Notify(NotifyError), Missing(MissingService) }
Services
#[derive(Clone, Service)]
struct UserRepository {
inner: Arc<dyn UserRepositoryImpl>,
}
#[derive(Clone, Service)]
struct PostRepository {
inner: Arc<dyn PostRepositoryImpl>,
}
#[derive(Clone, Service)]
struct Notifier {
inner: Arc<dyn NotificationImpl>,
}
Each service struct is cloneable and acts as its own lookup key.
Business Logic
fn get_author_feed(author_id: u64) -> Effect<(User, Vec<Post>), AppError, ServiceContext> {
UserRepository::use_(move |users| {
PostRepository::use_(move |posts| {
effect! {
let user = bind* users.get_user(author_id).map_error(AppError::Db);
let posts = bind* posts.get_posts_by_author(author_id).map_error(AppError::Db);
(user, posts)
}
})
})
}
The function depends on service types, not concrete infrastructure.
Production Layers
let config_layer = Layer::succeed(Config::from_env());
let db_layer = Layer::effect("Database", || {
Config::use_(|config| Database::connect(config.database_url))
});
let user_repo_layer = Layer::effect("UserRepository", || {
Database::use_(|db| succeed(UserRepository::postgres(db)))
});
let post_repo_layer = Layer::effect("PostRepository", || {
Database::use_(|db| succeed(PostRepository::postgres(db)))
});
let app_layer = user_repo_layer
.merge(post_repo_layer)
.provide_merge(db_layer.provide_merge(config_layer));
Run at the edge:
fn main() {
let result = run_blocking(get_author_feed(1).provide(app_layer), ());
println!("{result:?}");
}
Test Wiring
fn test_layer() -> Layer<(UserRepository, PostRepository), AppError, ()> {
Layer::succeed(UserRepository::in_memory([alice(), bob()]))
.merge(Layer::succeed(PostRepository::in_memory([alice_post()])))
}
#[test]
fn feed_includes_author_posts() {
let exit = run_test(get_author_feed(1).provide(test_layer()), ());
assert!(matches!(exit, Exit::Success((_, posts)) if posts.len() == 1));
}
What This Demonstrates
Business logic is decoupled from infrastructure:
- It reads services from
ServiceContext. - Production and tests provide different layers.
- Layer composition happens at the edge.
- Missing services fail through
MissingServiceif requested at runtime.
Part III shifts to operational concerns: errors, fibers, resources, and scheduling.
Error Handling — Cause, Exit, and Recovery
Part II gave you the full dependency injection story. Part III is about what happens when things go wrong — and in production, things always go wrong.
Rust’s Result<T, E> is excellent for expected errors: outcomes you anticipated and typed. But real programs also encounter unexpected failures: panics, OOM conditions, and cancelled fibers. effectful models all of these with a richer type hierarchy.
This chapter introduces Cause (the full error taxonomy), Exit (the terminal outcome of any effect), and the combinators for recovering from both.
Beyond Result — Why Cause Exists
Result<T, E> handles the errors you expect. But what about the errors that aren’t your E?
Expected vs. Unexpected
// Expected: you planned for this
Effect<User, UserNotFound, Db>
// But what if the database panics?
// What if the fiber is cancelled?
// What if the process runs out of memory?
// None of these are UserNotFound.
Traditional Rust handles unexpected failures through panics, which unwind (or abort) and bypass all your error handling. In async code, panics in tasks can silently swallow errors or leave resources unreleased.
The Cause Type
Cause<E> is effectful’s complete taxonomy of failure:
use effectful::Cause;
enum Cause<E> {
Fail(E), // Your typed, expected error
Die(Box<dyn Any>), // A panic or defect — something that shouldn't happen
Interrupt, // The fiber was cancelled
}
Every failure in the effect runtime is one of these three. Together they cover the full space of “things that can go wrong.”
Cause::Fail(e)— an error you declared inE, handled withcatchormap_errorCause::Die(payload)— a panic, logic bug, or fatal error; should be logged and treated as a defectCause::Interrupt— clean cancellation; the fiber was asked to stop and cooperated
Why This Matters
Without Cause, you can only handle Cause::Fail. The other two propagate invisibly up the fiber tree and may silently swallow logs or leave resources unreleased.
With Cause, you can handle all failure modes in a structured way:
my_effect.catch_all(|cause| match cause {
Cause::Fail(e) => recover_from_expected(e),
Cause::Die(panic) => log_defect_and_fail(panic),
Cause::Interrupt => succeed(default_value()),
})
Resource finalizers (Chapter 10) use this same model — they run on any Cause, ensuring cleanup regardless of how the fiber ends.
Day-to-Day Usage
In normal application code you rarely inspect Cause directly. You use:
.catch(f)for handlingCause::Fail.catch_all(f)when you need to handle panics or interruption tooExit(next section) when you need to inspect the terminal outcome
The Cause type is mostly visible at infrastructure boundaries — resource finalizers, fiber supervisors, and top-level error handlers.
Exit — Terminal Outcomes
Exit<A, E> records whether an effect-like computation succeeded or failed with a structured Cause<E>.
The Exit Type
use effectful::{Cause, Exit};
enum Exit<A, E> {
Success(A),
Failure(Cause<E>),
}
The public constructors are Exit::succeed(value), Exit::fail(error), Exit::die(message), and Exit::interrupt(fiber_id).
Getting an Exit
run_blocking(effect, env) and run_async(effect, env) return Result<A, E>. The test harness returns Exit<A, E>.
use effectful::{Exit, run_test};
let exit: Exit<User, DbError> = run_test(get_user(1), env);
match exit {
Exit::Success(user) => println!("Got user: {}", user.name),
Exit::Failure(Cause::Fail(DbError::NotFound)) => println!("User not found"),
Exit::Failure(Cause::Die(message)) => eprintln!("Defect: {message}"),
Exit::Failure(Cause::Interrupt(id)) => println!("Interrupted fiber {id:?}"),
Exit::Failure(Cause::Both(_, _) | Cause::Then(_, _)) => println!("Composite failure"),
}
There is no run_to_exit helper in the current API.
Converting Exit to Result
Use into_result() to get Result<A, Cause<E>>.
let result: Result<User, Cause<DbError>> = exit.into_result();
If you only want typed failures, pattern match on the cause.
let result: Result<User, AppError> = match exit.into_result() {
Ok(user) => Ok(user),
Err(Cause::Fail(e)) => Err(AppError::Expected(e)),
Err(Cause::Die(message)) => Err(AppError::Defect(message)),
Err(Cause::Interrupt(id)) => Err(AppError::Interrupted(id)),
Err(cause) => Err(AppError::Composite(cause.pretty())),
};
There is no into_result_or_panic helper in the current API.
Exit in Fibers
FiberHandle::await_exit() returns Effect<Exit<A, E>, Never, ()>. FiberHandle::join().await returns Result<A, Cause<E>>.
let exit: Exit<A, E> = run_async(handle.await_exit(), ()).await?;
let result: Result<A, Cause<E>> = handle.join().await;
Use Exit when you need to preserve all failure detail. Use Result when typed failures are enough.
Recovery Combinators — catch and Friends
The current Effect recovery surface works with typed errors E. Cause<E> appears in Exit and fibers, not in ordinary catch handlers.
catch
catch handles a typed failure by returning another effect.
let resilient = risky_db_call().catch(|error: DbError| {
match error {
DbError::NotFound => succeed(User::anonymous()),
other => fail(other),
}
});
Use catch when recovery itself may still fail.
catch_all
catch_all maps any typed error into a fallback success value. The returned effect is infallible through E.
let user = fetch_user(id).catch_all(|_error| User::anonymous());
Despite the name, this does not handle Cause::Die or Cause::Interrupt; it handles the effect’s typed error channel.
tap_error
tap_error runs an effect when the source fails, then re-emits the original error if the tap succeeds.
let observed = risky_call().tap_error(|message| log_error(message));
The handler receives a debug-formatted string because the original error is re-emitted without requiring Clone.
map_error
Use map_error to translate errors into your application error type.
let effect = db_call().map_error(AppError::Database);
Not Present Yet
The current API does not include fold, or_else, or ignore_error methods on Effect. Use catch, catch_all, map, and map_error, or pattern match on run_blocking(effect, env) at a boundary.
Error Accumulation — Collecting All Failures
effect! and flat_map are fail-fast. That is correct for dependent steps, but independent validation sometimes needs to collect multiple failures.
Fail-Fast Behavior
effect! {
let name = bind* validate_name(&input.name); // if this fails, stop
let email = bind* validate_email(&input.email); // not run after name failure
let age = bind* validate_age(input.age);
User { name, email, age }
}
Use fail-fast composition when each step depends on earlier successful values.
Manual Accumulation
The current root API does not include validate_all or partition helpers. For independent validations, run them as plain Result-returning checks or explicitly run each effect and collect errors.
let mut errors = Vec::new();
if let Err(error) = run_blocking(validate_name(&input.name), ()) {
errors.push(error);
}
if let Err(error) = run_blocking(validate_email(&input.email), ()) {
errors.push(error);
}
if let Err(error) = run_blocking(validate_age(input.age), ()) {
errors.push(error);
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
Keep this pattern at validation boundaries. Inside business workflows, prefer typed fail-fast effects.
Combining Error Types
When composing effects with different error types, use map_error or flat_map_union with Or.
use effectful::Or;
type BothErrors = Or<DbError, NetworkError>;
let combined: Effect<Data, BothErrors, ()> = db_fetch()
.union_error::<NetworkError>()
.flat_map(|db| network_fetch().map_error(Or::Right).map(move |net| merge(db, net)));
Or<A, B> lets you defer flattening into an application error until a boundary.
ParseErrors
ParseErrors is an aggregate container for schema-style validation issues.
let errors = ParseErrors::new(vec![
ParseError::new("name", "must not be empty"),
ParseError::new("email", "invalid email"),
]);
for issue in errors.issues {
eprintln!("{}: {}", issue.path, issue.message);
}
Current schema combinators usually return the first ParseError; build ParseErrors yourself when your boundary wants to report multiple issues.
When to Accumulate vs. Short-Circuit
| Situation | Use |
|---|---|
| Dependent steps | effect! / flat_map |
| Independent form validation | Manual accumulation |
| Batch import with partial success | Explicit loop collecting successes/failures |
| Schema boundary with many field issues | ParseErrors::new(collected) |
Concurrency & Fibers — Structured Async
Async Rust gives you the ability to do many things concurrently. The challenge is doing it safely — without fire-and-forget tasks that outlive their parent, without silent failures when a task panics, and without resource leaks when tasks are cancelled.
effectful uses Fibers for structured concurrency. A Fiber is a lightweight, interruptible async task with a typed result, an explicit lifecycle, and guaranteed cleanup.
This chapter covers spawning fibers, joining them, cancelling them gracefully, and using FiberRef for fiber-local state.
What Are Fibers? — Lightweight Structured Tasks
A fiber is an effectful concurrent task represented by FiberHandle<A, E>. It gives you a stable id, status inspection, interruption, and typed completion.
Fibers vs. Raw Tasks
// Raw tokio::spawn: lifecycle is owned by Tokio's JoinHandle.
tokio::spawn(async {
do_something().await;
});
// effectful fiber: lifecycle is explicit through FiberHandle<A, E>.
let runtime = ThreadSleepRuntime;
let handle: FiberHandle<User, DbError> = run_fork(&runtime, || (get_user(1), env));
let result: Result<User, Cause<DbError>> = handle.join().await;
join() returns Result<A, Cause<E>>. Use await_exit() when you need Exit<A, E>.
FiberId
Each handle has a FiberId.
let id = handle.id();
log::debug!("spawned fiber {id:?}");
For code that needs to run under a specific fiber id, use with_fiber_id(id, || ...).
FiberHandle and FiberStatus
let status: FiberStatus = handle.status();
match status {
FiberStatus::Running => {}
FiberStatus::Succeeded => {}
FiberStatus::Failed => {}
FiberStatus::Interrupted => {}
}
handle.interrupt();
FiberHandle<A, E> is cloneable. Status inspection does not consume the handle.
Structured Cleanup
Fibers can be attached to a Scope with handle.scoped(). Closing the scope interrupts the fiber through a finalizer.
let scope = Scope::make();
let scoped_effect = handle.scoped(); // Effect<A, Cause<E>, Scope>
// Run scoped_effect with `scope`; when another owner closes `scope`,
// the registered finalizer interrupts the handle.
Use scopes when a parent computation must own child-fiber cleanup.
Spawning and Joining — run_fork and fiber_all
The current API uses run_fork to spawn effects and FiberHandle methods to join, inspect, or interrupt them.
run_fork: Spawn One Fiber
use effectful::{ThreadSleepRuntime, run_fork};
let runtime = ThreadSleepRuntime;
let handle = run_fork(&runtime, || (compute_expensive_result(), env));
let local_result = local_computation();
let remote_result = handle.join().await?;
(local_result, remote_result)
The factory returns (effect, env) so the worker owns both the one-shot effect and its environment.
await_exit vs join
let exit: Exit<A, E> = run_async(handle.await_exit(), ()).await?;
let result: Result<A, Cause<E>> = handle.join().await;
Use await_exit when you want the exact Exit; use join for a Result with Cause<E> on failure.
fiber_all
fiber_all joins already-created handles.
use effectful::{fiber_all, run_async};
let handles = user_ids
.into_iter()
.map(|id| run_fork(&runtime, move || (fetch_user(id), ())))
.collect::<Vec<_>>();
let users: Vec<User> = run_async(fiber_all(handles), ()).await?;
If any handle fails, fiber_all returns the first Cause<E> it observes.
Racing
There is no fiber_race or fiber_any free function in the current API. FiberHandle::or_else races two handles and completes with whichever handle resolves first.
let primary = run_fork(&runtime, || (fetch_from_primary(), ()));
let backup = run_fork(&runtime, || (fetch_from_backup(), ()));
let raced = primary.or_else(backup);
let data = raced.join().await?;
The slower fiber is not automatically cancelled by or_else; keep its handle if you need to interrupt it.
Error Behavior
| Operation | Failure shape |
|---|---|
handle.join().await | Result<A, Cause<E>> |
handle.await_exit() | Exit<A, E> inside an infallible effect |
fiber_all(handles) | Effect<Vec<A>, Cause<E>, ()> |
handle.or_else(other) | First handle completion wins |
Cancellation — Interrupting Gracefully
Cancellation is explicit and cooperative. CancellationToken is a shared flag; FiberHandle::interrupt() marks a fiber handle as interrupted.
CancellationToken
use effectful::{CancellationToken, check_interrupt};
let token = CancellationToken::new();
let child = token.child_token();
assert!(!token.is_cancelled());
token.cancel();
assert!(child.is_cancelled());
Cancelling a parent token cancels child tokens. Cancelling a child does not cancel its parent.
Checking for Cancellation
check_interrupt(&token) snapshots whether the token is cancelled.
fn process_large_dataset(token: CancellationToken) -> Effect<(), Never, ()> {
effect! {
for chunk in large_dataset.chunks(1000) {
let cancelled = bind* check_interrupt(&token);
if cancelled {
break;
}
process_chunk(chunk);
}
}
}
Use token.cancelled() when an effect should wait until cancellation happens.
let wait_for_shutdown = token.cancelled(); // Effect<(), Never, ()>
Interrupting a FiberHandle
let handle = run_fork(&runtime, || (background_work(), ()));
handle.interrupt();
let result = handle.join().await;
assert!(matches!(result, Err(Cause::Interrupt(_))));
interrupt() completes the handle with Cause::Interrupt(id) if it was still pending. It returns false if the handle had already completed.
Graceful Shutdown
The basic shutdown pattern is:
- Signal shared cancellation tokens.
- Interrupt top-level handles that should stop.
- Await handles with whatever timeout policy your runtime uses.
token.cancel();
for handle in &handles {
handle.interrupt();
}
for handle in handles {
let _ = handle.join().await;
}
Not Present Yet
The current API does not include method-style with_cancellation or an uninterruptible helper. Keep cancellation explicit by passing CancellationToken to long-running effects and checking it at safe points.
FiberRef — Fiber-Local State
FiberRef<A> stores values keyed by (FiberRef id, FiberId). It is useful for trace ids, request context, and other fiber-local data.
Creating a FiberRef
use effectful::{FiberRef, run_blocking};
let trace_id: FiberRef<String> = run_blocking(
FiberRef::make(|| "none".to_string()),
(),
)?;
FiberRef::make returns an effect because allocation is part of the effect runtime model.
Reading and Writing
let program = effect! {
bind* trace_id.set("req-abc-123".to_string());
let id = bind* trace_id.get();
bind* log(&format!("[{id}] processing request"));
bind* process_request()
};
Available operations include get, set, update, modify, reset, locally, and locally_with.
Fork and Join Hooks
When you manage logical child fibers yourself, use on_fork and on_join to seed and merge fiber-local values.
let parent = FiberId::ROOT;
let child = FiberId::fresh();
run_blocking(trace_id.on_fork(parent, child), ())?;
with_fiber_id(child, || {
run_blocking(trace_id.set("child-trace".to_string()), ())
})?;
run_blocking(trace_id.on_join(parent, child), ())?;
The default fork behavior clones the parent value into the child. The default join behavior keeps the child value.
Local Overrides
locally(value, effect) overrides the current fiber’s value while the inner effect runs, then restores the previous value.
let inner = trace_id.locally(
"override".to_string(),
trace_id.get(),
);
let value = run_blocking(inner, ())?;
assert_eq!(value, "override");
Current Limitations
Fiber identity is stored in a thread-local cell. This matches single-threaded run_blocking and current-thread Tokio runtimes. Multi-threaded task migration is not tracked yet.
Resources & Scopes — Deterministic Cleanup
RAII works beautifully in synchronous Rust: resources are released when they fall out of scope, Drop runs deterministically. In async code, the picture gets complicated.
This chapter shows why RAII breaks down in async contexts, introduces Scope and finalizers as the solution, covers the acquire_release pattern for RAII-style resource management, and concludes with Pool for reusing expensive connections.
The Resource Problem — Cleanup in Async
RAII in synchronous code:
{
let file = File::open("data.txt")?;
process(&file)?;
} // file.drop() runs here, always, unconditionally
Reliable. Simple. The drop happens when the scope ends — no exceptions (unless you have exceptions).
The Async Complication
async fn process_data() -> Result<(), Error> {
let conn = open_connection().await?;
let data = fetch_data(&conn).await?; // What if this is cancelled?
transform_and_save(data).await?; // Or this?
conn.close().await?; // May never reach here
Ok(())
}
Three problems:
- Cancellation: If this async function is cancelled mid-execution,
conn.close()never runs. - Panic: If
transform_and_savepanics, the async task is dropped.conn.close()is skipped. - Async Drop:
impl Drop for Connectioncan only do synchronous cleanup. If closing a connection requires.await, you can’t do it inDrop.
conn.close() must be an async call, but Drop can’t be async. This is a fundamental mismatch.
The Root Cause
RAII relies on Drop running synchronously when a value goes out of scope. In async code, “going out of scope” and “running cleanup” can be decoupled — by cancellation, by executor scheduling, or by the fact that async closures are state machines that might never reach certain states.
The Solution Preview
effectful solves this with:
Scope— a region where finalizers are registered and guaranteed to run (even on cancellation or panic)acquire_release— a combinator that pairs acquisition with its cleanupPool— for long-lived resources that need controlled reuse
All three run cleanup effects (not just synchronous Drop), and all three run them unconditionally — success, failure, or interruption.
Scopes and Finalizers — Guaranteed Cleanup
Scope is a finalizer registry. Finalizers are plain boxed closures that receive an Exit<(), Never> and return Effect<(), Never, ()>.
Creating a Scope
use effectful::{Effect, Exit, Never, Scope, scope_with};
let result = scope_with(|scope| {
effect! {
let conn = bind* open_connection();
let conn_for_close = conn.clone();
let added = scope.add_finalizer(Box::new(move |_exit: Exit<(), Never>| {
conn_for_close.close()
}));
if !added {
return Err(AppError::ScopeClosed);
}
let data = bind* fetch_data(&conn);
process(data)
}
});
scope_with creates a fresh scope, runs the returned effect, then closes the scope. Closing runs registered finalizers.
Finalizers Always Run on Close
let scope = Scope::make();
let added = scope.add_finalizer(Box::new(|_exit| cleanup_temp_file()));
assert!(added);
scope.close();
Finalizers run when close / close_with_exit is called. close is idempotent and returns true only for the first close.
Multiple Finalizers
Finalizers run in reverse registration order.
scope.add_finalizer(Box::new(|_| close_connection(conn)));
scope.add_finalizer(Box::new(|_| rollback_transaction(txn)));
scope.add_finalizer(Box::new(|_| close_cursor(cursor)));
scope.close();
// Runs: close_cursor, rollback_transaction, close_connection
Register parent resources first and child resources last.
Scope Inheritance
Scopes can be nested manually.
let outer = Scope::make();
let inner = outer.fork();
inner.add_finalizer(Box::new(|_| cleanup_inner()));
outer.add_finalizer(Box::new(|_| cleanup_outer()));
outer.close(); // closes children before outer finalizers
Use Scope::fork for child scopes and Scope::extend to reparent an existing open scope.
acquire_release — Acquire Then Release
acquire_release(acquire, release) runs an acquire effect, then runs the release effect, then returns the acquired value.
use effectful::acquire_release;
let effect = acquire_release(
open_connection(),
|conn| conn.close(),
);
Current behavior is simple and immediate:
- Run
open_connection(). - If acquisition succeeds, clone the acquired value.
- Run
conn.close()in a default release environment. - Return the cloned acquired value.
This is not a scoped bracket that keeps the resource open for a user block. For block-scoped resource use, use scope_with and Scope::add_finalizer, or a Pool checkout that returns resources when the caller’s scope closes.
When to Use It
Use acquire_release for acquire/release pairs where returning the acquired value after release is still meaningful, or as a low-level primitive while building stronger resource helpers.
let effect = effect! {
bind* acquire_release(load_temp_value(), |value| cleanup_temp_value(value))
};
Scoped Resource Pattern
For resources that must remain open while work runs, register a finalizer in a scope.
let program = scope_with(|scope| {
effect! {
let conn = bind* open_connection();
let conn_for_close = conn.clone();
scope.add_finalizer(Box::new(move |_| conn_for_close.close()));
bind* run_query(&conn, "SELECT 1")
}
});
That pattern keeps acquisition, use, and cleanup in the same visible block.
Pools — Reusing Expensive Resources
Pool<A, E> and KeyedPool<K, A, E> manage reusable values with a capacity gate. A checkout is tied to a Scope: when the scope closes, the value is returned to the idle list unless invalidated.
Pool
use effectful::Pool;
let pool_effect = Pool::make(10, || open_connection("postgres://localhost/app"));
let pool: Pool<Connection, DbError> = run_blocking(pool_effect, ())?;
Pool::make(capacity, factory) returns Effect<Pool<A, E>, Never, ()>. The factory is an effect that creates a fresh value when no reusable idle value is available.
Checking Out
use effectful::{Scope, run_blocking};
let scope = Scope::make();
let conn = run_blocking(pool.get(), scope.clone())?;
// Use conn while scope is open.
scope.close(); // returns conn to the pool's idle list
pool.get() returns Effect<A, E, Scope>. It acquires capacity, reuses an idle value or runs the factory, and registers a finalizer in the provided scope.
Invalidating Values
run_blocking(pool.invalidate(conn.clone()), ())?;
Invalidated values are not reused when their checkout scope closes.
KeyedPool
Use KeyedPool when resource creation depends on a key.
use effectful::KeyedPool;
let pools_effect = KeyedPool::make(20, |key: DbRole| open_connection_for(key));
let pools: KeyedPool<DbRole, Connection, DbError> = run_blocking(pools_effect, ())?;
let scope = Scope::make();
let primary = run_blocking(pools.get(DbRole::Primary), scope.clone())?;
let replica = run_blocking(pools.get(DbRole::Replica), scope.clone())?;
scope.close();
Capacity is global across all keys. Idle values are stored per key.
TTL
Both pool types have make_with_ttl. Idle values older than the TTL are discarded on checkout.
let pool = run_blocking(
Pool::make_with_ttl(10, Duration::from_secs(300), || open_connection(url.clone())),
(),
)?;
Pool as a Service
In applications, build the pool at startup and provide it as a service or context value. Business code should depend on the pool abstraction and checkout inside an explicit Scope.
Scheduling — Retry, Repeat, and Time
Production services fail. Networks are unreliable. Downstream APIs go down. The database gets overwhelmed. Defensive engineering means anticipating failure and building policies for what to do when it happens.
effectful models these policies with Schedule — a type that describes when to retry, how long to wait between attempts, and when to give up. Combined with Clock injection, scheduling logic becomes testable without real-time delays.
Schedule — The Retry/Repeat Policy Type
A Schedule is a policy used by the free retry and repeat functions. It receives ScheduleInput { attempt } and either returns a ScheduleDecision { delay } or stops.
The Core Concept
use effectful::{Schedule, ScheduleInput};
let mut schedule = Schedule::recurs(3);
while let Some(decision) = schedule.next(ScheduleInput { attempt }) {
sleep(decision.delay);
attempt += 1;
}
The current schedule model tracks the attempt number. It does not inspect elapsed time, the last success value, or the last error.
Creating Schedules
use std::time::Duration;
use effectful::{Schedule, ScheduleInput};
let max_three = Schedule::recurs(3);
let fixed = Schedule::spaced(Duration::from_secs(1));
let exponential = Schedule::exponential(Duration::from_millis(100));
let until_attempt_ten = Schedule::recurs_until(Box::new(|input: &ScheduleInput| input.attempt >= 10));
let while_first_ten = Schedule::recurs_while(Box::new(|input: &ScheduleInput| input.attempt < 10));
Combining Schedules
Use compose to require both schedules to continue. The produced delay is the maximum of the two decisions.
use std::time::Duration;
use effectful::Schedule;
let retry_policy = Schedule::exponential(Duration::from_millis(100))
.compose(Schedule::recurs(5));
Use jittered to apply deterministic jitter to delays.
let jittered = Schedule::spaced(Duration::from_secs(1)).jittered();
Schedule as a Value
Schedules are values you pass to retry or repeat. Effects are one-shot, so these functions take factories.
use std::time::Duration;
use effectful::{Effect, Schedule, retry};
fn call_external_api() -> Effect<Response, ApiError, HttpClient> {
retry(
|| make_request(),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(5)),
)
}
Built-in Schedules
effectful’s current Schedule API is intentionally small. It covers fixed delays, exponential backoff, attempt limits, predicates, composition, and deterministic jitter.
recurs
Schedule::recurs(5)
Allows five additional schedule steps with zero delay. Compose it with a delay schedule to cap retries or repeats.
spaced
Schedule::spaced(Duration::from_secs(5))
Produces the same delay for every step. Use it for polling, heartbeats, and fixed-interval repeat loops.
exponential
Schedule::exponential(Duration::from_millis(100))
// Delays: 100ms, 200ms, 400ms, 800ms, ...
The standard retry backoff. Compose with Schedule::recurs(n) to cap attempts.
let bounded = Schedule::exponential(Duration::from_millis(100))
.compose(Schedule::recurs(5));
recurs_while / recurs_until
use effectful::{Schedule, ScheduleInput};
let first_ten = Schedule::recurs_while(Box::new(|input: &ScheduleInput| input.attempt < 10));
let until_ten = Schedule::recurs_until(Box::new(|input: &ScheduleInput| input.attempt >= 10));
These schedules decide from the attempt counter.
compose
let policy = Schedule::spaced(Duration::from_secs(1))
.compose(Schedule::recurs(10));
compose continues only while both schedules continue. When both produce a delay, the larger delay wins.
jittered
let policy = Schedule::exponential(Duration::from_millis(100))
.jittered()
.compose(Schedule::recurs(5));
jittered currently applies deterministic jitter. It is useful for exercising jitter paths without adding random behavior to tests.
Not Present Yet
The current API does not include Fibonacci schedules, max-delay caps, total-duration caps, method-style .retry(), or method-style .repeat(). Use the free retry / repeat functions with factories.
retry and repeat — Applying Policies
retry and repeat apply a Schedule to effect factories. They are free functions because Effect values are one-shot.
retry
use std::time::Duration;
use effectful::{Schedule, retry};
let result = retry(
|| flaky_api_call(),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);
retry runs the effect returned by the factory. If it fails and the schedule continues, it waits and calls the factory again. It returns the first success, or the last error when the schedule stops.
repeat
use std::time::Duration;
use effectful::{Schedule, repeat};
let polling = repeat(
|| check_job_status(),
Schedule::spaced(Duration::from_secs(5)).compose(Schedule::recurs(12)),
);
repeat runs once, then continues while the schedule continues. It returns the last success value.
Use cases:
- Poll for job completion every few seconds
- Send a bounded number of heartbeats
- Refresh a cache on a fixed interval
Explicit Clock Variants
Use retry_with_clock and repeat_with_clock when tests need a deterministic clock.
use effectful::{Schedule, TestClock, retry_with_clock};
use std::time::{Duration, Instant};
let clock = TestClock::new(Instant::now());
let effect = retry_with_clock(
|| flaky_api_call(),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
clock,
None,
);
The _and_interrupt variants also accept a CancellationToken.
Conditional Retry
The current API does not include retry_while or an error predicate parameter. retry retries every failure until the schedule stops. If you need error-sensitive retry, write a small custom loop around Effect::run or add that policy at the call site before using retry.
Composition
retry and repeat return ordinary effects.
let batch = retry(
|| process_single_item(item.clone()),
Schedule::exponential(Duration::from_millis(100)).compose(Schedule::recurs(3)),
);
let continuous = repeat(
move || batch_factory(),
Schedule::spaced(Duration::from_secs(60)).compose(Schedule::recurs(10)),
);
Clock Injection — Testable Time
retry and repeat use a live clock by default. For deterministic tests, use the explicit-clock variants: retry_with_clock, repeat_with_clock, and their interruption-aware forms.
The Clock Trait
use std::time::{Duration, Instant};
use effectful::{Effect, Never};
trait Clock {
fn now(&self) -> Instant;
fn sleep(&self, duration: Duration) -> Effect<(), Never, ()>;
fn sleep_until(&self, deadline: Instant) -> Effect<(), Never, ()>;
}
Clock is monotonic-time oriented. Calendar time for logging is exposed separately through LiveClock::now_utc().
Production: LiveClock
use effectful::{LiveClock, ThreadSleepRuntime};
let live_clock = LiveClock::new(ThreadSleepRuntime);
LiveClock delegates sleeping and now() to a runtime.
Testing: TestClock
use std::time::{Duration, Instant};
use effectful::{Clock, TestClock};
let start = Instant::now();
let clock = TestClock::new(start);
assert_eq!(clock.now(), start);
clock.advance(Duration::from_secs(60));
assert_eq!(clock.now(), start + Duration::from_secs(60));
TestClock records pending sleeps. Advancing or setting time drops pending sleeps whose deadlines have elapsed.
Test Example
use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::time::{Duration, Instant};
use effectful::{Schedule, TestClock, retry_with_clock, run_blocking};
let clock = TestClock::new(Instant::now());
let attempts = Arc::new(AtomicU32::new(0));
let effect = retry_with_clock(
{
let attempts = attempts.clone();
move || failing_operation(attempts.clone())
},
Schedule::exponential(Duration::from_secs(1)).compose(Schedule::recurs(3)),
clock.clone(),
None,
);
let result = run_blocking(effect, ());
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::Relaxed), 4); // initial + 3 retries
This test runs without sleeping in real time because TestClock::sleep only records deadlines.
Clock as a Service
For application logic that needs time directly, model the clock as a service in your environment. The scheduling helpers accept a clock value explicitly; your own services can do the same.
Software Transactional Memory — Optimistic Concurrency
Shared mutable state is hard. Mutexes work but compose poorly: lock two mutexes in the wrong order and you deadlock. Lock them separately and you get torn reads. Lock the whole world and you serialise unnecessarily.
Software Transactional Memory (STM) takes a different approach: every operation on shared state runs inside a transaction. Transactions commit atomically or roll back and retry. No explicit locks. No deadlocks. No torn reads.
This chapter covers effectful’s STM implementation: Stm, TRef, commit, and the transactional collection types.
Why STM? — The Shared State Problem
Consider transferring money between two accounts. With mutexes:
fn transfer(from: &Mutex<Account>, to: &Mutex<Account>, amount: u64) {
let from_guard = from.lock().unwrap();
// Thread B might be doing transfer(to, from, ...) right here
let to_guard = to.lock().unwrap();
// DEADLOCK: Thread A holds from, waiting for to.
// Thread B holds to, waiting for from.
from_guard.balance -= amount;
to_guard.balance += amount;
}
The standard fix (always lock in a consistent order) requires global coordination across your codebase. Add a third account and you need to sort three locks. It doesn’t compose.
STM: Optimistic Concurrency
STM operates on the assumption that conflicts are rare. Instead of locking, it:
- Reads current values into a local transaction log
- Computes new values based on those reads
- Attempts to commit: checks that nothing changed since the reads, then atomically writes
If anything changed between step 1 and step 3, the transaction retries automatically from step 1.
use effectful::{Effect, Stm, TRef, commit};
fn transfer(from: TRef<Account>, to: TRef<Account>, amount: u64) -> Effect<(), TransferError, ()> {
let transaction: Stm<(), TransferError> = from.read_stm().flat_map(move |from_acct| {
to.read_stm().flat_map(move |to_acct| {
if from_acct.balance < amount {
Stm::fail(TransferError::InsufficientFunds)
} else {
from.write_stm(Account { balance: from_acct.balance - amount, ..from_acct })
.flat_map(move |_| to.write_stm(Account { balance: to_acct.balance + amount, ..to_acct }))
}
})
});
commit(transaction)
}
No locks. No deadlock risk. The transaction retries automatically if another transaction modified either account between our read and our write.
When STM Wins
| Situation | Mutex | STM |
|---|---|---|
| Single shared value | ✓ simple | ✓ fine |
| Multiple related values | ✗ deadlock risk | ✓ composable |
| Read-heavy workloads | ✗ blocks writers | ✓ reads never block |
| Composing two existing operations | ✗ requires coordination | ✓ compose Stm values |
| Long operations with I/O | ✓ (STM would retry too much) | ✗ wrong tool |
STM shines when:
- You need to update multiple values atomically
- You’re composing smaller transactional operations into larger ones
- Contention is low (retries are cheap)
Avoid STM for long-running operations that do I/O. Transactions should be short and pure; use Effect for I/O and Stm for transactional state changes.
TRef — Transactional References
TRef<T> is the fundamental mutable cell in effectful’s STM system. A TRef is read and written through Stm operations, then committed atomically with commit / atomically.
Creating a TRef
use effectful::{TRef, commit, run_blocking};
let counter: TRef<i32> = run_blocking(commit(TRef::make(0)), ())?;
let balance: TRef<f64> = run_blocking(commit(TRef::make(1000.0)), ())?;
TRef::make(value) returns Stm<TRef<T>, ()>, not a TRef directly. That keeps allocation inside the same transactional model as reads and writes.
Transactional Operations
All operations return Stm<_, E> descriptions. Nothing changes until the transaction is committed.
use effectful::{Stm, TRef};
let counter: TRef<i32> = /* built with TRef::make */;
let read_op: Stm<i32, ()> = counter.read_stm();
let write_op: Stm<(), ()> = counter.write_stm(42);
let update_op: Stm<(), ()> = counter.update_stm(|n| n + 1);
let modify_op: Stm<i32, ()> = counter.modify_stm(|n| (n, n + 1));
Use update_stm when you only need to write a new value. Use modify_stm when the transaction should return one value and store another.
Composing Transactions
There is currently no stm! macro. Compose Stm values with flat_map and map.
use effectful::{Stm, TRef};
let counter: TRef<i32> = /* ... */;
let total: TRef<i32> = /* ... */;
let transaction: Stm<(), ()> = counter.read_stm().flat_map(move |count| {
let total = total.clone();
counter
.write_stm(count + 1)
.flat_map(move |_| total.update_stm(move |sum| sum + count))
});
Sharing TRefs
TRef is cloneable and internally shared. Wrap it in Arc only when your surrounding ownership model needs an Arc.
use std::sync::Arc;
use effectful::TRef;
let shared: Arc<TRef<i32>> = Arc::new(run_blocking(commit(TRef::make(0)), ())?);
TRef vs. Mutex<T>
| Property | TRef | Mutex |
|---|---|---|
| Composable across multiple cells | Yes | Manual lock ordering |
| Commit is atomic | Yes | Only while locks are held |
| Transaction body can do I/O | No | Technically yes, but risky |
| Retry on conflict | Yes | No |
Use TRef for short, composable state mutations. Use Mutex for non-transactional shared state, especially when you cannot model the operation as a short pure transaction.
Stm and commit — Building Transactions
Stm<A, E> describes a transactional computation. It is lazy and pure with respect to the outside world: it can read/write transactional cells, fail with E, or retry.
commit: Lift Stm into Effect
use effectful::{Effect, Stm, TRef, commit, run_blocking};
let ref_a: TRef<i32> = run_blocking(commit(TRef::make(1)), ())?;
let ref_b: TRef<i32> = run_blocking(commit(TRef::make(2)), ())?;
let transaction: Stm<i32, ()> = ref_a.read_stm().flat_map(move |a| {
ref_b.read_stm().map(move |b| a + b)
});
let effect: Effect<i32, (), ()> = commit(transaction);
let result = run_blocking(effect, ())?;
commit(stm) returns Effect<A, E, R>. The transaction is executed when the effect runs. On commit conflicts or Stm::retry(), it retries.
atomically
atomically(stm) is an alias for commit(stm). It still returns an Effect; run it with run_blocking, run_async, or compose it with other effects.
use effectful::{atomically, run_blocking};
let effect = atomically(counter.modify_stm(|n| (n + 1, n + 1)));
let value = run_blocking(effect, ())?;
Stm::fail
Transactions can fail with typed errors:
use effectful::{Stm, TRef};
fn withdraw(account: TRef<u64>, amount: u64) -> Stm<u64, InsufficientFunds> {
account.read_stm().flat_map(move |balance| {
if balance < amount {
Stm::fail(InsufficientFunds)
} else {
account
.write_stm(balance - amount)
.map(move |_| balance - amount)
}
})
}
Stm::fail(e) aborts the current transaction immediately. commit propagates that E through the returned effect.
Stm::retry
Sometimes a transaction should wait until a condition is true rather than fail:
use effectful::{Stm, TRef};
fn dequeue(queue: TRef<Vec<Item>>) -> Stm<Item, ()> {
queue.read_stm().flat_map(move |items| {
if items.is_empty() {
Stm::retry()
} else {
let item = items[0].clone();
queue.write_stm(items[1..].to_vec()).map(move |_| item)
}
})
}
Stm::retry() asks the commit loop to restart later. TQueue::take uses this behavior to wait while empty.
Composing Transactions
Transactions compose with flat_map and map.
let big_transaction: Stm<(), AppError> = transfer_funds(from, to, amount)
.flat_map(move |_| record_audit_log(from, to, amount));
let effect = commit(big_transaction);
The composed transaction commits as a unit. If any part fails, retries, or observes a conflict, the whole transaction does not partially commit.
TQueue, TMap, TSemaphore — Transactional Collections
effectful provides STM-aware collection types. Their constructors and operations return Stm values, so they compose with TRef reads/writes and commit atomically.
TQueue
use effectful::{Stm, TQueue};
let queue_stm: Stm<TQueue<Job>, ()> = TQueue::bounded(100);
let unbounded_stm: Stm<TQueue<Job>, ()> = TQueue::unbounded();
let offer: Stm<bool, ()> = queue.offer(job); // false when bounded and full
let take: Stm<Job, ()> = queue.take(); // retries while empty
TQueue::offer does not block when a bounded queue is full; it returns false. TQueue::take retries when the queue is empty.
Producer-Consumer Pattern
fn producer(queue: TQueue<Job>, jobs: Vec<Job>) -> Effect<(), (), ()> {
effect! {
for job in jobs {
let accepted = bind* commit(queue.offer(job));
if !accepted {
return Err(());
}
}
}
}
fn consumer(queue: TQueue<Job>) -> Effect<(), JobError, ()> {
effect! {
loop {
let job = bind* commit(queue.take());
bind* process_job(job);
}
}
}
TMap
use effectful::{Stm, TMap};
let map_stm: Stm<TMap<String, User>, ()> = TMap::make();
let get: Stm<Option<User>, ()> = map.get(&"alice".to_string());
let set: Stm<(), ()> = map.set("alice".to_string(), alice_user);
let delete: Stm<(), ()> = map.delete(&"alice".to_string());
TMap is a transactional hash map. Reading from a TMap and updating a TRef in the same transaction is atomic.
let transaction = user_map.get(&"alice".to_string()).flat_map(move |user| {
access_counter
.update_stm(|count| count + 1)
.map(move |_| user)
});
let effect = commit(transaction);
TSemaphore
use effectful::{Stm, TSemaphore};
let sem_stm: Stm<TSemaphore, ()> = TSemaphore::make(10);
let acquire: Stm<(), ()> = sem.acquire(); // retries while zero
let release: Stm<(), ()> = sem.release();
TSemaphore is a transactional permit counter. acquire decrements by one or retries when no permits are available; release increments by one.
let guarded = sem.acquire().flat_map(move |_| {
update_shared_state().flat_map(move |result| {
sem.release().map(move |_| result)
})
});
let effect = commit(guarded);
Summary
| Type | Purpose |
|---|---|
TRef<T> | Single transactional cell |
TQueue<T> | Transactional FIFO queue |
TMap<K, V> | Transactional hash map |
TSemaphore | Transactional permit counter |
All compose as Stm values and commit atomically with other STM operations.
Streams — Backpressure and Chunked Processing
An Effect produces one value. A Stream produces many values over time. When you need to process a potentially infinite or very large sequence — database result sets, event logs, file lines, sensor readings — Stream is the right abstraction.
This chapter covers when to use Stream vs Effect, how streams process data in Chunks for efficiency, how to control flow with backpressure policies, and how to consume streams with Sink.
Stream vs Effect — When to Use Each
The choice is about cardinality.
Effect<A, E, R> -> produces exactly one A or fails
Stream<A, E, R> -> produces zero or more A values or fails
Concrete Examples
fn get_user(id: u64) -> Effect<User, DbError, Db>
fn all_users() -> Stream<User, DbError, Db>
fn count_orders() -> Effect<u64, DbError, Db>
fn export_orders() -> Stream<Order, DbError, Db>
If you fetch 10 million rows into a Vec and return it as an Effect, you can run out of memory. A Stream lets consumers process chunks incrementally.
Stream Transformations
all_users()
.filter(Box::new(|u: &User| u.is_active()))
.map(UserSummary::from)
.take(100)
Current element operators include map, filter, take, take_while, drop_while, map_effect, and map_par_n.
Collecting a Stream into an Effect
When all results fit in memory, use run_collect.
let users: Effect<Vec<User>, DbError, Db> = all_users().run_collect();
For large results, prefer a fold or a sink.
let count: Effect<usize, DbError, Db> = all_users().run_fold(0, |acc, _| acc + 1);
Converting Effect to Stream
Stream::from_effect expects an effect that produces a Vec<A>.
use effectful::Stream;
let one_user: Effect<Vec<User>, DbError, Db> = get_user(1).map(|user| vec![user]);
let stream: Stream<User, DbError, Db> = Stream::from_effect(one_user);
For pure finite streams, use Stream::from_iterable.
let numbers = Stream::from_iterable([1, 2, 3]);
The Rule
| Need | Use |
|---|---|
| One result | Effect |
| Many results | Stream |
| All stream results in memory | stream.run_collect() |
| Aggregated result | stream.run_fold(init, f) or Sink::fold_left |
| Custom consumer | sink.run(stream) |
Chunks — Batched Stream Data
Chunk<A> is the batch container used by stream internals. It wraps a Vec<A> and makes chunk-level stream semantics explicit.
What Is a Chunk
use effectful::Chunk;
let chunk = Chunk::from_vec(vec![1, 2, 3, 4, 5]);
assert_eq!(chunk.len(), 5);
assert!(!chunk.is_empty());
for item in chunk.iter() {
println!("{item}");
}
Why Chunks Exist
Streams pull values in batches. Operators can transform a whole Chunk internally instead of paying per-element overhead at every boundary.
Single-element model:
elem1 -> map -> filter -> emit -> elem2 -> map -> filter -> emit -> ...
Chunk model:
chunk[1..64] -> map chunk -> filter chunk -> emit chunk -> ...
Working with Chunks
let doubled = chunk.map(|x| x * 2);
let values = doubled.into_vec();
The current public Chunk API is intentionally small:
| Method | Purpose |
|---|---|
Chunk::empty() | Empty chunk |
Chunk::singleton(value) | One-element chunk |
Chunk::from_vec(values) | Build from a vector |
.len() / .is_empty() | Inspect size |
.iter() | Iterate by reference |
.into_vec() | Consume into Vec<A> |
.map(f) | Transform elements |
.sort_with(order) / .compare_by(other, order) | Runtime ordering helpers |
In Streams and Sinks
Most application code does not construct chunks directly. Stream::poll_next_chunk and sink drivers use chunks internally; user-facing APIs usually expose element-wise transformations or final Effect consumers.
Backpressure Policies — Controlling Flow
Channel-backed streams use BackpressurePolicy to decide what happens when the internal queue is full.
The Problem
Producer: emits 10,000 events/sec
Consumer: processes 1,000 events/sec
What happens to the 9,000 surplus events per second?
The answer is domain-specific: block, drop, or fail.
BackpressurePolicy
use effectful::BackpressurePolicy;
BackpressurePolicy::BoundedBlock // wait until space is available
BackpressurePolicy::DropNewest // discard the newly offered item
BackpressurePolicy::DropOldest // evict the oldest queued item
BackpressurePolicy::Fail // fail the producer enqueue effect
backpressure_decision(policy, queue_len, capacity) exposes the decision logic directly for tests or diagnostics.
Channel-Backed Streams
use effectful::{BackpressurePolicy, Chunk, stream_from_channel_with_policy, send_chunk, end_stream};
let (stream, sender) = stream_from_channel_with_policy::<Event, AppError, ()>(
1024,
BackpressurePolicy::DropOldest,
);
run_blocking(send_chunk(&sender, Chunk::singleton(event)), ())?;
run_blocking(end_stream(sender), ())?;
Use stream_from_channel(capacity) for the default BoundedBlock policy.
Choosing a Policy
| Scenario | Policy |
|---|---|
| No data loss acceptable | BoundedBlock |
| Latest value matters most | DropOldest |
| New overload data is expendable | DropNewest |
| Caller must know about overflow | Fail |
Monitoring Drops
There is no built-in dropped-counter helper. If drops matter operationally, wrap send_chunk or your producer logic and record metrics at that boundary.
Summary
Choose a policy explicitly. BoundedBlock is safest for correctness but can stall producers. DropOldest and DropNewest trade completeness for bounded memory. Fail surfaces overload as an error.
Sinks — Consuming Streams
A Sink<Out, In, E, R> reduces a Stream<In, E, R> into an Out. It is a struct with a driver, not a trait you implement directly.
Built-in Sinks
use effectful::Sink;
let collect = Sink::<Vec<User>, User, DbError, Db>::collect();
let total = Sink::fold_left(0u64, |acc, order: Order| acc + order.amount);
let drain = Sink::<(), Event, EventError, EventEnv>::drain();
Run a sink with sink.run(stream).
let users: Effect<Vec<User>, DbError, Db> = Sink::collect().run(all_users());
let total: Effect<u64, DbError, Db> = total.run(orders());
Stream Consumer Methods
For common cases, stream methods are often simpler.
let users: Effect<Vec<User>, DbError, Db> = all_users().run_collect();
let total: Effect<u64, DbError, Db> = orders()
.run_fold(0u64, |acc, order| acc + order.amount);
let logged: Effect<(), EventError, EventEnv> = events()
.run_for_each_effect(|event| log_event(event));
Sink Composition
Sink::zip combines two fold-based sinks into one pass.
let count = Sink::fold_left(0usize, |n, _order: Order| n + 1);
let total = Sink::fold_left(0u64, |sum, order: Order| sum + order.amount);
let both = count.zip(total);
let effect: Effect<(usize, u64), DbError, Db> = both.run(orders());
zip panics if either sink was not created with fold_left / from_fold.
Other Built-ins
| Sink | Purpose |
|---|---|
Sink::collect() | Collect elements into Vec<In> |
Sink::collect_all_while(pred) | Collect until predicate first fails |
Sink::collect_all_until(pred) | Collect until predicate first succeeds |
Sink::fold_left(init, f) | Left fold |
Sink::drain() | Discard all elements |
Sink::to_queue(queue) | Offer each element to a queue |
Sink::collect_to_map() | Collect (K, V) pairs into EffectHashMap |
Custom Sinks
The current API does not expose a public trait for custom chunk callbacks. Build custom consumers with Stream::run_fold, run_fold_effect, run_for_each_effect, or compose existing Sink constructors.
Schema — Parse, Don’t Validate
Data enters your program from the outside world: HTTP request bodies, database rows, configuration files, message queue payloads. All of it is untrusted. All of it needs to be checked.
The naive approach is to deserialise first and validate later — accept a User struct via serde, then check that email is non-empty and age is positive in a separate step. The problem: your type says User but your program has a User that might have an empty email. The type lies.
The better approach is parse, don’t validate: transform untrusted input into trusted types in one step. If the parse succeeds, you have a valid User. If it fails, you have a structured ParseError that tells you exactly what was wrong.
effectful’s schema module is built on this principle.
What This Chapter Covers
Unknown— the type for unvalidated wire data (next section)- Schema combinators — the building blocks for describing data shapes (ch14-02)
- Validation and refinement —
refine,filter, andBrandfor domain constraints (ch14-03) ParseErrors— structured, accumulating error reports (ch14-04)
The Unknown Type — Unvalidated Wire Data
Unknown is effectful’s dynamic input tree for schema decoding. It represents JSON-like data without trusting its shape.
Creating Unknown Values
use std::collections::BTreeMap;
use effectful::schema::Unknown;
let s = Unknown::String("hello".to_string());
let n = Unknown::I64(42);
let b = Unknown::Bool(true);
let null = Unknown::Null;
let arr = Unknown::Array(vec![Unknown::I64(1), Unknown::I64(2)]);
let mut fields = BTreeMap::new();
fields.insert("name".to_string(), Unknown::String("Alice".to_string()));
fields.insert("age".to_string(), Unknown::I64(30));
let obj = Unknown::Object(fields);
With the schema-serde feature, convert from serde_json::Value using unknown_from_serde_json.
use effectful::schema::unknown_from_serde_json;
let value: serde_json::Value = serde_json::from_str(input)?;
let unknown = unknown_from_serde_json(value);
Why Not serde_json::Value Directly?
serde_json::Value is useful at the edge, but schemas need a stable internal representation with effectful’s parse errors and combinators. Unknown gives schema decoders one input model independent of where the data came from.
Inspecting Unknown Values
Most code should not inspect Unknown directly. Decode it with a Schema. For debugging or custom decoders, match on the enum variants.
match &unknown {
Unknown::Object(fields) => { /* inspect fields */ }
Unknown::Array(items) => { /* inspect items */ }
Unknown::String(value) => { /* inspect string */ }
Unknown::I64(value) => { /* inspect integer */ }
Unknown::F64(value) => { /* inspect float */ }
Unknown::Bool(value) => { /* inspect bool */ }
Unknown::Null => { /* inspect null */ }
}
The Parse Boundary
Use Unknown at I/O boundaries, then decode once into trusted domain types.
use effectful::schema::{Unknown, string};
let raw = Unknown::String("alice@example.com".to_string());
let email = string::<()>().decode_unknown(&raw)?;
Nothing beyond the parse boundary should accept Unknown; domain functions should accept validated types.
Schema Combinators — Describing Data Shapes
A schema describes how to decode wire data into a typed value and encode it back. In the current API the full shape is Schema<A, I, E>: semantic value A, wire/intermediate value I, and schema marker E.
Primitive Schemas
use effectful::schema::{bool_, f64, i64, string};
let name = string::<()>(); // Schema<String, String, ()>
let age = i64::<()>(); // Schema<i64, i64, ()>
let price = f64::<()>(); // Schema<f64, f64, ()>
let active = bool_::<()>(); // Schema<bool, bool, ()>
Each primitive can decode its typed wire value with decode, and can decode an Unknown with decode_unknown.
Struct-Like Schemas
struct_, struct3, and struct4 decode named object fields into tuples. Use transform to map the tuple into a domain struct.
use effectful::schema::{ParseError, Schema, i64, string, struct_, transform};
#[derive(Clone)]
struct User {
name: String,
age: i64,
}
let tuple_schema = struct_("name", string::<()>(), "age", i64::<()>());
let user_schema = transform(
tuple_schema,
|(name, age)| Ok(User { name, age }),
|user: User| (user.name, user.age),
);
If a field is missing or has the wrong type, decode_unknown returns a ParseError with the field path.
Optional and Array Schemas
use effectful::schema::{array, optional, string};
let maybe_name = optional(string::<()>());
let tags = array(string::<()>());
optional(schema) accepts Unknown::Null as None. array(schema) decodes each element and prefixes parse errors with the failing index.
Validation and Transformation
Use free combinators, not schema methods.
use effectful::schema::{ParseError, filter, string, transform};
let non_empty = filter(string::<()>(), |s| !s.is_empty(), "must not be empty");
let email_schema = transform(
non_empty,
|s| Email::parse(s).map_err(|e| ParseError::new("", format!("invalid email: {e}"))),
|email: Email| email.into_string(),
);
filter keeps values satisfying a predicate. transform performs bidirectional conversion and may fail while decoding.
Unions
union_ tries a primary schema, then a fallback schema. For more than two branches, use union_chain from schema::extra.
use effectful::schema::{Schema, Unknown, union_};
let primary: Schema<UserId, Unknown, ()> = user_id_from_number();
let fallback: Schema<UserId, Unknown, ()> = user_id_from_string();
let user_id_schema = union_(primary, fallback);
Running a Schema
Run schemas directly through their methods.
use effectful::schema::{Unknown, i64};
let raw = Unknown::I64(30);
let age = i64::<()>().decode_unknown(&raw)?;
decode works on the schema’s typed wire value I. decode_unknown works on Unknown trees at I/O boundaries.
Validation and Refinement — Constrained Types
Schemas parse structure. Validation adds constraints: an age must be positive, an email must contain @, a price must have at most two decimal places.
refine / filter
refine and filter attach a predicate to an existing schema. Parsing succeeds only when the base schema succeeds and the predicate returns true.
use effectful::schema::{i64, refine, string, filter};
let age_schema = refine(
i64::<()>(),
|n| (0..=150).contains(n),
"age must be between 0 and 150",
);
let non_empty = filter(
string::<()>(),
|s: &String| !s.is_empty(),
"must not be empty",
);
If the predicate fails, decoding returns ParseError::new("", message).
Fallible Transformation
Use transform when conversion can fail or when the semantic type differs from the wire type.
use effectful::schema::{ParseError, string, transform};
let url_schema = transform(
string::<()>(),
|s| url::Url::parse(&s).map_err(|e| ParseError::new("", format!("invalid URL: {e}"))),
|url: url::Url| url.to_string(),
);
The decode closure returns Result<B, ParseError>. The encode closure maps the semantic value back to the base schema’s semantic type.
Brand
Brand<A, B> is a zero-cost nominal wrapper. Use Brand::nominal when the value was already validated, or RefinedBrand when construction should validate.
use effectful::schema::{Brand, RefinedBrand};
struct EmailMarker;
type Email = Brand<String, EmailMarker>;
let email = Brand::nominal("alice@example.com".to_string());
let make_email = RefinedBrand::<String, EmailMarker>::new(|s| {
if s.contains('@') {
Ok(())
} else {
Err("invalid email".to_string())
}
});
let checked: Email = make_email.try_make("alice@example.com".to_string())?;
Now APIs can demand Email instead of a raw String.
fn send_welcome(to: Email) -> Effect<(), MailError, Mailer> { /* ... */ }
HasSchema
HasSchema attaches a canonical schema to a type family. The trait exposes associated types for semantic value, wire value, and schema marker.
use effectful::schema::{HasSchema, Schema, i64};
struct UserIdSchema;
impl HasSchema for UserIdSchema {
type A = i64;
type I = i64;
type E = ();
fn schema() -> Schema<Self::A, Self::I, Self::E> {
i64::<()>()
}
}
Use HasSchema for generic tooling that needs to ask for a canonical schema without knowing how it is built.
Summary
| Tool | When to use |
|---|---|
refine / filter | Predicate on a parsed value |
transform | Fallible conversion or semantic/wire conversion |
Brand::nominal | Nominal wrapper after validation elsewhere |
RefinedBrand | Validating branded constructor |
HasSchema | Attach a canonical schema to a type-level provider |
ParseErrors — Structured Parse Failures
ParseError represents one schema decoding failure. ParseErrors is a small aggregate wrapper used when APIs want to return more than one issue.
ParseError vs ParseErrors
use effectful::schema::{ParseError, ParseErrors};
let e = ParseError::new("age", "age must be positive");
let one = ParseErrors::one(e.clone());
let many = ParseErrors::new(vec![e]);
ParseError has two public fields: path and message.
let err = ParseError::new("users.0.age", "expected i64");
assert_eq!(err.path, "users.0.age");
assert_eq!(err.message, "expected i64");
Path Tracking
Schema combinators prefix paths as they decode nested data.
use effectful::schema::{Unknown, array, i64};
let schema = array(i64::<()>());
let raw = Unknown::Array(vec![Unknown::I64(1), Unknown::String("oops".to_string())]);
let err = schema.decode_unknown(&raw).unwrap_err();
assert_eq!(err.path, "1");
struct_, struct3, and struct4 prefix field names. array prefixes element indexes.
Accumulation Status
The current schema decoders generally short-circuit on the first failure for decode_unknown, but decode_unknown_all accumulates nested field errors for object and tuple schemas (including struct_/struct3/struct4, tuple/tuple3/tuple4), as well as array elements and union arm diagnostics. ParseErrors exists as the aggregate type for boundaries or custom validators that collect multiple ParseError values themselves.
fn validate_user(raw: &Unknown) -> Result<User, ParseErrors> {
let mut issues = Vec::new();
if let Err(err) = name_schema().decode_unknown(raw) {
issues.push(err);
}
if let Err(err) = age_schema().decode_unknown(raw) {
issues.push(err);
}
if issues.is_empty() {
build_user(raw).map_err(ParseErrors::one)
} else {
Err(ParseErrors::new(issues))
}
}
API Boundary Conversion
Convert parse issues into your API error type at the boundary.
#[derive(Debug)]
enum ApiError {
Validation(Vec<FieldError>),
}
#[derive(Debug)]
struct FieldError {
field: String,
message: String,
}
fn to_api_errors(errs: ParseErrors) -> ApiError {
ApiError::Validation(
errs.issues
.into_iter()
.map(|e| FieldError { field: e.path, message: e.message })
.collect(),
)
}
Parse Errors in Effects
Schema decoding returns Result. Lift it into an Effect by mapping the error into your effect error channel.
effect! {
let req = create_user_schema()
.decode_unknown(&raw)
.map_err(|err| ApiError::Validation(ParseErrors::one(err)))?;
bind* create_user(req)
}
Display
ParseErrors implements Display by printing each issue on its own line. Empty paths omit the path: prefix.
name: must not be empty
age: age must be positive
Summary
| Type | Meaning |
|---|---|
ParseError | One failure with path and message |
ParseErrors | Aggregate { issues: Vec<ParseError> } |
ParseErrors::one(err) | Build a single-issue aggregate |
ParseErrors::new(vec) | Build an aggregate from collected issues |
Testing — Effects Are Easy to Test
Testing async code often means standing up infrastructure, dealing with timing, and wide mocks—then chasing occasional flakes in CI.
Effect programs can be tested differently. Because an Effect is a description of what to do — not the doing itself — you control everything about how it runs. Swap in a test clock. Provide fake services through the Layer system. Detect fiber leaks automatically. Run in microseconds instead of seconds.
This chapter covers the testing tools effectful provides.
What Makes Effects Testable
Three properties make effect programs easy to test:
1. Services are injected, not ambient.
Your code doesn’t call DatabaseClient::global(). It declares R: NeedsDb and gets its database from the environment. In tests, you provide a different environment — one with a fake database.
2. Time is injectable.
Code that uses Clock instead of std::time::SystemTime::now() can be tested with TestClock, which advances only when you tell it to.
3. Effects don’t run until the harness runs them.
An Effect is inert. You can inspect, compose, and modify it before running. In tests, prefer returning Effect from the test body and letting #[effect_test] run it at the edge.
What This Chapter Covers
#[effect_test]— effect-returning tests where the harness owns execution (next section)run_test/run_effect_test*— lower-level harness helpers for explicit assertions (next section)TestClock— deterministic time control in tests (ch15-02)- Mocking services — injecting test doubles via layers (ch15-03)
- Property testing — generating inputs and checking invariants (ch15-04)
effect_test — The Test Harness Boundary
Effectful tests should compose effects in the test body and let the harness execute them. This keeps lazy execution semantics intact and keeps direct Effect::run(&mut ...) calls inside runtime/test harness internals.
Preferred Usage
use effectful::{Effect, effect_test};
#[effect_test]
fn simple_effect_succeeds() -> Effect<(), &'static str, ()> {
Effect::new(|_| Ok(()))
}
#[effect_test] creates an async Tokio test and runs the returned effect. Ok(_) passes. Err(E) panics with Debug output, so the error type must implement Debug.
The macro uses effectful’s internal current-thread Tokio test re-export; downstream crates do not need to call Effect::run(&mut ...) in the test body.
Provided ServiceContext
use effectful::{Effect, MissingService, Service, ServiceContext, effect_test};
#[derive(Clone, Service)]
struct Config {
port: u16,
}
fn test_env() -> ServiceContext {
Config { port: 8080 }.to_context()
}
#[effect_test(env = "test_env")]
fn reads_config() -> Effect<(), MissingService, ServiceContext> {
Effect::<Config, MissingService, ServiceContext>::service::<Config>()
.map(|config| assert_eq!(config.port, 8080))
}
The fixture runs once per test and returns the environment consumed by the effect.
Layer-Based Setup
use effectful::{Effect, Layer, MissingService, Service, ServiceContext, effect_test};
#[derive(Clone, Service)]
struct Config {
port: u16,
}
fn test_layer() -> Layer<Config, MissingService, ()> {
Layer::succeed(Config { port: 8080 })
}
#[effect_test(layer = "test_layer")]
fn reads_layer_config() -> Effect<(), MissingService, ServiceContext> {
Effect::<Config, MissingService, ServiceContext>::service::<Config>()
.map(|config| assert_eq!(config.port, 8080))
}
This is the Rust equivalent of Effect’s it.layer(...) boundary: the test body remains an effect, and the adapter builds/provides the services.
Helper API
Use helper functions when an attribute macro is not appropriate.
use effectful::testing::expect_effect_test;
#[tokio::test]
async fn create_user_inserts_into_db() {
expect_effect_test(create_user(NewUser { name: "Alice".into(), age: 30 })).await;
}
Available helpers:
| Function | Notes |
|---|---|
expect_effect_test(effect).await | Run with R: Default, panic on Err(E: Debug) |
expect_effect_test_with_env(effect, env).await | Run with explicit environment, panic on failure |
expect_effect_test_with_layer(effect, layer).await | Build a layer for ServiceContext, panic on failure |
run_effect_test(effect).await | Run with R: Default, return Result<A, E> |
run_effect_test_with_env(effect, env).await | Run with explicit environment, return Result<A, E> |
TestRuntime::with_env(fixture) | Reusable adapter for explicit fixture functions |
Asserting on Exit
run_test remains available when you need the older synchronous Exit<A, E> shape.
use effectful::{Exit, run_test};
#[test]
fn division_by_zero_fails() {
let exit = run_test(divide(10, 0), ());
assert!(matches!(exit, Exit::Failure(Cause::Fail(DivError::DivisionByZero))));
}
Pass () for effects with no environment. run_test(effect, env) resets the test leak counters, runs the effect with run_blocking, then checks the leak counters.
Common Exit shapes:
| Exit | Meaning |
|---|---|
Exit::Success(a) | Effect succeeded |
Exit::Failure(Cause::Fail(e)) | Typed failure |
Exit::Failure(Cause::Die(message)) | Defect message |
Exit::Failure(Cause::Interrupt(id)) | Fiber interrupt |
run_test_with_clock
use std::time::Instant;
use effectful::{TestClock, run_test_with_clock};
let clock = TestClock::new(Instant::now());
let exit = run_test_with_clock(effect, env, clock);
run_test_with_clock currently delegates to run_test after accepting the explicit clock argument. Use explicit-clock scheduling helpers (retry_with_clock, repeat_with_clock) when the effect itself must use that clock.
Leak Assertions
The testing module exposes assertion effects and test hooks:
use effectful::{assert_no_leaked_fibers, assert_no_unclosed_scopes};
run_blocking(assert_no_leaked_fibers(), ())?;
run_blocking(assert_no_unclosed_scopes(), ())?;
The effect-test helpers and run_test call both assertions after the effect run. If a hook recorded a leak, the assertion panics.
TestClock — Deterministic Time in Tests
TestClock is a manual Clock implementation. It is useful with retry_with_clock, repeat_with_clock, or code that accepts a Clock explicitly.
The Problem with Real Time in Tests
let eff = retry(
|| failing_call(),
Schedule::exponential(Duration::from_secs(1)).compose(Schedule::recurs(3)),
);
With the default live clock, this waits in real time. Use TestClock to avoid real sleeps.
TestClock API
use std::time::{Duration, Instant};
use effectful::{Clock, TestClock};
let start = Instant::now();
let clock = TestClock::new(start);
let now: Instant = clock.now();
clock.advance(Duration::from_millis(500));
clock.set_time(start + Duration::from_secs(60));
let pending: Vec<Instant> = clock.pending_sleeps();
pending_sleeps() returns registered sleep deadlines. This is useful for asserting that code scheduled a delay.
run_test_with_clock
run_test_with_clock(effect, env, clock) runs an already-built effect and returns Exit<A, E>. It does not create a closure-based test harness.
use std::time::Instant;
use effectful::{TestClock, run_test_with_clock, succeed};
let clock = TestClock::new(Instant::now());
let exit = run_test_with_clock(succeed::<_, (), ()>(42), (), clock);
assert_eq!(exit, Exit::succeed(42));
Testing Scheduled Work
use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::time::{Duration, Instant};
use effectful::{Schedule, TestClock, repeat_with_clock, run_blocking, succeed};
let clock = TestClock::new(Instant::now());
let counter = Arc::new(AtomicU32::new(0));
let effect = repeat_with_clock(
{
let counter = counter.clone();
move || {
counter.fetch_add(1, Ordering::Relaxed);
succeed::<u32, (), ()>(counter.load(Ordering::Relaxed))
}
},
Schedule::spaced(Duration::from_secs(60)).compose(Schedule::recurs(3)),
clock.clone(),
None,
);
let value = run_blocking(effect, ())?;
assert_eq!(value, 4); // initial run + 3 repeats
TestClock::sleep records sleeps instead of blocking, so scheduled tests complete quickly.
Fake Services
When business logic needs time, pass a Clock as part of your service/environment design. In production provide LiveClock; in tests provide TestClock.
Mocking Services — Test Doubles via Layers
In effectful, a test double is just a different service value or layer. Production code gets a real service; tests get an in-memory or spy service with the same service type.
The Pattern
Wrap your interface in a cloneable service struct.
trait DbImpl: Send + Sync {
fn get_user(&self, id: UserId) -> Effect<User, DbError, ()>;
fn save_user(&self, user: User) -> Effect<(), DbError, ()>;
}
#[derive(Clone, Service)]
struct Db {
inner: Arc<dyn DbImpl>,
}
impl Db {
fn get_user(&self, id: UserId) -> Effect<User, DbError, ()> {
self.inner.get_user(id)
}
fn save_user(&self, user: User) -> Effect<(), DbError, ()> {
self.inner.save_user(user)
}
}
Test Double
struct InMemoryDb {
users: Mutex<HashMap<UserId, User>>,
}
impl DbImpl for InMemoryDb {
fn get_user(&self, id: UserId) -> Effect<User, DbError, ()> {
match self.users.lock().expect("users lock").get(&id).cloned() {
Some(user) => succeed(user),
None => fail(DbError::NotFound(id)),
}
}
fn save_user(&self, user: User) -> Effect<(), DbError, ()> {
self.users.lock().expect("users lock").insert(user.id, user);
succeed(())
}
}
Injecting the Test Double
#[effect_test(env = "test_env")]
fn get_user_returns_saved_user() -> Effect<(), DbError, ServiceContext> {
let effect = Db::use_(|db| {
effect! {
bind* db.save_user(User { id: UserId::new(1), name: "Alice".into() });
let user = bind* db.get_user(UserId::new(1));
assert_eq!(user.name, "Alice");
}
});
effect
}
fn test_env() -> ServiceContext {
let db = Db { inner: Arc::new(InMemoryDb::new()) };
db.to_context()
}
Business logic is unchanged. Only the service value changes.
Spies
When you need to assert calls, add tracking to the test double.
#[derive(Clone, Service)]
struct Mailer {
sent: Arc<Mutex<Vec<Email>>>,
}
impl Mailer {
fn send(&self, email: Email) -> Effect<(), MailError, ()> {
self.sent.lock().expect("sent lock").push(email);
succeed(())
}
}
#[test]
fn registration_sends_welcome_email() {
let mailer = Mailer { sent: Arc::new(Mutex::new(Vec::new())) };
let env = mailer.clone().to_context();
let exit = run_test(register_user("alice@example.com"), env);
assert!(matches!(exit, Exit::Success(_)));
let sent = mailer.sent.lock().expect("sent lock");
assert_eq!(sent.len(), 1);
}
Failing Services
Test failure handling by providing a service whose methods fail.
struct FailingDb;
impl DbImpl for FailingDb {
fn get_user(&self, _id: UserId) -> Effect<User, DbError, ()> {
fail(DbError::ConnectionLost)
}
fn save_user(&self, _user: User) -> Effect<(), DbError, ()> {
fail(DbError::ConnectionLost)
}
}
#[test]
fn get_user_propagates_db_errors() {
let env = Db { inner: Arc::new(FailingDb) }.to_context();
let exit = run_test(get_user(UserId::new(1)), env);
assert!(matches!(exit, Exit::Failure(Cause::Fail(DbError::ConnectionLost))));
}
Layer-Based Setup
For larger tests, package doubles in layers.
fn test_layer() -> Layer<(Db, Mailer), AppError, ()> {
Layer::succeed(Db { inner: Arc::new(InMemoryDb::new()) })
.merge(Layer::succeed(Mailer::spy()))
}
#[effect_test(layer = "test_layer")]
fn full_registration_flow_works() -> Effect<(), AppError, ServiceContext> {
full_registration_flow().void()
}
What You Don’t Need
- No mock framework.
- No
#[cfg(test)]in business logic. - No global service registry reset between tests.
- No special mocking API beyond ordinary services and layers.
Property Testing — Invariants over Inputs
Property tests check invariants over many generated inputs. Effect programs are good targets because the runner boundary is explicit and test environments are ordinary values.
Setup
[dev-dependencies]
proptest = "1"
Testing Pure Effects
use effectful::{Exit, run_test};
use proptest::prelude::*;
proptest! {
#[test]
fn addition_is_commutative(a: i64, b: i64) {
let r_ab = run_test(add(a, b), ());
let r_ba = run_test(add(b, a), ());
prop_assert_eq!(r_ab, r_ba);
}
}
run_test(effect, env) returns Exit<A, E>. For properties, either compare exits directly or match Exit::Success(value).
Testing Schema Round-Trips
Schemas expose encode, decode, and decode_unknown.
use effectful::schema::{i64, string, struct_, transform};
use proptest::prelude::*;
proptest! {
#[test]
fn user_schema_round_trips(name in "[a-zA-Z]{1,50}", age in 0i64..=120) {
let schema = transform(
struct_("name", string::<()>(), "age", i64::<()>()),
|(name, age)| Ok(User { name, age }),
|user: User| (user.name, user.age),
);
let original = User { name, age };
let wire = schema.encode(original.clone());
let parsed = schema.decode(wire);
prop_assert_eq!(parsed.ok(), Some(original));
}
}
Round-trip tests catch asymmetries between encode and decode logic.
Testing Error Invariants
Use TRef::make and commit to construct transactional state.
use effectful::{Cause, Exit, TRef, commit, run_blocking, run_test};
use proptest::prelude::*;
proptest! {
#[test]
fn withdraw_never_goes_negative(balance in 0u64..=1_000_000, amount in 0u64..=1_000_000) {
let account = run_blocking(commit(TRef::make(balance)), ()).expect("make account");
let exit = run_test(withdraw(account.clone(), amount), ());
let new_balance = run_blocking(commit(account.read_stm::<InsufficientFunds>()), ())
.expect("read balance");
if amount <= balance {
prop_assert!(matches!(exit, Exit::Success(_)));
prop_assert_eq!(new_balance, balance - amount);
} else {
prop_assert!(matches!(exit, Exit::Failure(Cause::Fail(InsufficientFunds))));
prop_assert_eq!(new_balance, balance);
}
}
}
Generating Service Environments
For integration-style properties, generate random state and place it in a test service.
proptest! {
#[test]
fn get_user_returns_what_was_saved(user in arbitrary_user()) {
let db = Db::in_memory();
let env = db.clone().to_context();
let save_exit = run_test(save_user(user.clone()), env.clone());
prop_assert!(matches!(save_exit, Exit::Success(_)));
let get_exit = run_test(get_user(user.id), env);
prop_assert!(matches!(get_exit, Exit::Success(retrieved) if retrieved == user));
}
}
Define generators with normal proptest strategies.
fn arbitrary_user() -> impl Strategy<Value = User> {
(
any::<u64>().prop_map(UserId::new),
"[a-zA-Z ]{1,50}",
0i64..=120,
).prop_map(|(id, name, age)| User { id, name, age })
}
Schema-Driven Generation
There is no built-in generate_valid::<T>() helper. Keep generators beside schemas and use round-trip properties to ensure they stay aligned.
proptest! {
#[test]
fn generated_users_are_accepted(user in arbitrary_user()) {
let schema = user_schema();
let wire = schema.encode(user.clone());
prop_assert_eq!(schema.decode(wire).ok(), Some(user));
}
}
Shrinking
proptest automatically shrinks failing inputs to the smallest example that still fails. Because effects are run explicitly, shrinking remains easy to reason about.
Test failed. Minimal failing input:
name = ""
age = -1
Reason: must not be empty (path: name)
Observability
Effectful includes lazy span instrumentation for production tracing without forcing an OpenTelemetry SDK or exporter dependency into core.
Use spans when you want to know when an Effect starts, succeeds, fails, how long it ran, what trace/span identity it had, and how it relates to parent spans.
Manual Spans
Wrap any existing effect with with_span or the method form.
#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking, succeed};
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let effect = succeed::<u32, (), ()>(42).with_span("manual.demo");
assert_eq!(run_blocking(effect, ()), Ok(42));
}
Use SpanOptions for levels and typed startup attributes.
#![allow(unused)]
fn main() {
use effectful::{SpanLevel, SpanOptions, succeed};
let effect = succeed::<(), (), ()>(()).with_span_options(
SpanOptions::new("manual.options")
.with_level(SpanLevel::Debug)
.with_attribute("cached", true)
.with_attribute("attempt", 2_i32),
);
}
Function Spans
Use #[effectful::span] on functions returning Effect.
#![allow(unused)]
fn main() {
use effectful::{Effect, Never, span, succeed};
#[span(name = "user.load", level = debug)]
fn load_user(id: u32) -> Effect<u32, Never, ()> {
succeed::<u32, Never, ()>(id + 1)
}
}
Calling load_user only constructs an effect. The span starts when that returned effect is executed.
By default, span names are module_path::function_name, and function arguments are captured with Debug formatting as string attributes.
Privacy Controls
Skip sensitive, large, or non-Debug arguments with skip(...).
#![allow(unused)]
fn main() {
use effectful::{Effect, Never, span, succeed};
struct Secret(String);
#[span(name = "auth.login", skip(password))]
fn login(user_id: u64, password: Secret) -> Effect<(), Never, ()> {
let _ = password.0.len();
succeed::<(), Never, ()>(())
}
}
Use skip_all to capture only explicit fields.
#![allow(unused)]
fn main() {
#[effectful::span(skip_all, fields(route = "/users/:id", cache_hit = true))]
fn route() -> effectful::Effect<(), effectful::Never, ()> {
effectful::succeed::<(), effectful::Never, ()>(())
}
}
Field values can be typed, display-formatted with %, or debug-formatted with ?.
#![allow(unused)]
fn main() {
#[effectful::span(fields(count = 3_i32, user = %user_id, payload = ?payload))]
fn work(user_id: u64, payload: Vec<u8>) -> effectful::Effect<(), effectful::Never, ()> {
effectful::succeed::<(), effectful::Never, ()>(())
}
}
Typed fields preserve strings, booleans, signed integers, and floats. % and ? fields are stored as strings.
Disabled Tracing
When tracing is disabled or not installed, span hooks are no-ops. For non-async #[span] functions, the span macro also avoids formatting captured arguments while tracing is disabled or the span is sampled out.
#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking};
let _ = run_blocking(install_tracing_layer(TracingConfig::default()), ());
}
Trace Context
Every span record has local OpenTelemetry-shaped identity: TraceId, SpanId, TraceFlags, and parent span id.
Use W3C traceparent helpers at integration boundaries.
#![allow(unused)]
fn main() {
use effectful::SpanContext;
let context = SpanContext::from_traceparent(
"00-01010101010101010101010101010101-0202020202020202-01",
)?;
let header = context.to_traceparent();
Ok::<(), effectful::TraceParentParseError>(())
}
Rust tracing Bridge
Enable the Rust tracing bridge when you want existing subscribers, Loki pipelines, or tracing-opentelemetry stacks to consume effectful spans. Use TracingConfig::tracing_bridge_only() for production forwarding without retaining snapshot_tracing() buffers; use enabled_with_tracing_bridge() when you also need in-memory snapshots.
#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking};
let _ = tracing_subscriber::fmt().try_init();
let _ = run_blocking(
install_tracing_layer(TracingConfig::tracing_bridge_only()),
(),
);
}
Full snapshot+bridge mode emits spans named effectful.span with otel_name, otel_trace_id, otel_span_id, otel_parent_span_id, otel_trace_flags, status, duration, and attribute fields. Span events emitted with emit_current_span_event are emitted as Rust tracing events inside the current span.
Bridge-only mode emits name-only Rust tracing spans with otel_name. It intentionally skips trace id generation, parent lookup, attribute recording, and snapshot storage so it can stay production-oriented.
For high-throughput services, use an async/batched subscriber/exporter. Subscriber formatting, locks, and exporter backpressure usually dominate span overhead.
Sampling
Use sampling on hot paths. default_sample_rate applies globally; SpanOptions::sample_rate and #[effectful::span(sample = ...)] override it per span. Rates are rounded down to a power-of-two cadence, so 0.3 records at most one in four spans.
#![allow(unused)]
fn main() {
use effectful::{TracingConfig, install_tracing_layer, run_blocking};
let mut config = TracingConfig::tracing_bridge_only();
config.default_sample_rate = 0.0625;
let _ = run_blocking(install_tracing_layer(config), ());
}
Snapshot mode is best for tests, local debugging, and diagnostics. Bridge-only plus sampling is the production-oriented path for very small effects.
See examples 106_span_macro.rs and 107_tracing_bridge.rs for executable coverage.
API Quick Reference
A condensed reference for commonly used items in effectful 0.2.2. For complete signatures, run cargo doc --open -p effectful.
Core Types
| Type | Description |
|---|---|
Effect<A, E, R> | Lazy computation that succeeds with A, fails with E, and requires environment R |
Exit<A, E> | Terminal outcome: Success(A) or Failure(Cause<E>) |
Cause<E> | Failure algebra: Fail(E), Die(String), Interrupt(FiberId), Both, Then |
Context<Cons<..., Nil>> | Typed heterogeneous environment for tagged services |
ServiceContext | Runtime service table used by the derive-based service/layer API |
Layer<ROut, E, RIn> | Lazy service layer for ServiceContext |
Stm<A, E> | Transactional computation |
Stream<A, E, R> | Pull stream of A chunks |
Sink<Out, In, E, R> | Consumer of stream elements |
Schema<A, I, E> | Bidirectional decoder/encoder with Unknown support |
Effect Constructors
| Function | Notes |
|---|---|
succeed(a) | Always succeeds with a |
fail(e) | Always fails with e |
pure(a) | Infallible Effect<A, (), ()> convenience |
from_async(f) | Lift an async closure returning Result<A, E> |
Effect::new(f) | Lift a synchronous closure over &mut R |
Effect::new_async(f) | Lift an async closure that may borrow &mut R |
effect! { ... } | Do-notation; use bind* effect to bind |
Effect Combinators
| Method | Notes |
|---|---|
.map(f) | Transform success value |
.flat_map(f) | Sequentially compose effects |
.and_then(other) | Sequence and keep right result |
.and_then_discard(other) | Sequence and keep left result |
.map_error(f) | Transform typed error |
.catch(f) | Recover by returning another effect |
.catch_all(f) | Recover with a fallback value and Never error |
.tap_error(f) | Run an effect on failure and re-emit the error |
.provide_env(env) | Capture a full environment and return Effect<_, _, ()> |
.zoom_env(f) | Run an effect requiring R inside a larger environment S |
.local(f) | Run with a cloned and modified local environment |
.ensuring(finalizer) | Run finalizer after success or failure |
.on_exit(f) | Observe Exit without changing result |
Running Effects
| Function | Notes |
|---|---|
run_blocking(effect, env) | Blocking synchronous runner |
run_async(effect, env).await | Async runner |
#[effect_test] | Attribute for tests that return Effect; harness executes and panics on Err(E: Debug) |
expect_effect_test(effect).await | Async test helper for R: Default, panics on failure |
expect_effect_test_with_env(effect, env).await | Async test helper with explicit environment |
expect_effect_test_with_layer(effect, layer).await | Async test helper for ServiceContext plus Layer |
run_effect_test(effect).await | Async test helper returning Result<A, E> |
run_effect_test_with_env(effect, env).await | Async test helper returning Result<A, E> with explicit environment |
TestRuntime::with_env(fixture) | Reusable async test adapter with environment fixture |
run_test(effect, env) | Test runner returning Exit<A, E> |
run_test_with_clock(effect, env, clock) | Test runner with explicit TestClock argument |
Services and Layers
| Item | Notes |
|---|---|
#[derive(Service)] | Make a cloneable struct available through Effect::service::<S>() |
Effect::service::<S>() | Read service S from R: ServiceLookup<S> |
ServiceContext::empty().add(service) | Build a derive-service environment |
Layer::succeed(service) | Infallible service layer |
Layer::effect(name, make) | Effectful service layer |
layer.merge(other) | Build independent services into one context |
layer.provide(provider) | Use provider services to build the layer, then hide provider output |
layer.provide_merge(provider) | Provide dependencies and keep both outputs |
layer.memoized() | Cache the first layer build result |
Tagged Context Helpers
| Item | Notes |
|---|---|
service_key!(pub struct Key); | Declare a nominal key type |
Tagged<Key, V> / Service<Key, V> | Keyed service cell |
service_env::<Key, _>(value) | Build a one-cell Context |
ctx.get::<Key>() | Read a tagged value from a typed context |
effect.provide_head(value) | Provide the head tagged dependency |
Concurrency
| Function/Method | Notes |
|---|---|
| `run_fork(&runtime, | |
handle.join() | Await result as Result<A, Cause<E>> |
handle.await_exit() | Await result as Exit<A, E> |
handle.interrupt() | Interrupt the fiber |
handle.status() | Inspect Running, Succeeded, Failed, or Interrupted |
fiber_all(handles) | Join many FiberHandles into Effect<Vec<A>, Cause<E>, ()> |
with_fiber_id(id, f) | Run f under a fiber id |
STM
| Function | Notes |
|---|---|
TRef::make(v) | Transactionally allocate a cell: Stm<TRef<T>, ()> |
tref.read_stm() | Read inside Stm |
tref.write_stm(v) | Write inside Stm |
tref.update_stm(f) | Update value, returning () |
tref.modify_stm(f) | Compute (output, next) and write next |
Stm::succeed(a) / Stm::fail(e) | Transactional success/failure |
Stm::retry() | Retry the current transaction |
commit(stm) / atomically(stm) | Lift Stm<A, E> into Effect<A, E, R> |
TQueue::bounded(n) / TQueue::unbounded() | Transactional FIFO queues |
queue.offer(v) / queue.take() | Enqueue or dequeue transactionally |
TMap::make() / map.get, set, delete | Transactional map |
TSemaphore::make(n) / acquire, release | Transactional permit counter |
Scheduling
| Function | Notes |
|---|---|
Schedule::recurs(n) | Allow n additional iterations |
Schedule::spaced(d) | Fixed delay |
Schedule::exponential(base) | Exponential backoff |
schedule.compose(other) | Continue while both schedules continue; use max delay |
schedule.jittered() | Apply deterministic jitter |
Schedule::recurs_while(pred) | Continue while predicate holds |
Schedule::recurs_until(pred) | Continue until predicate holds |
| `retry( | |
| `repeat( |
Streams and Sinks
| Function | Notes |
|---|---|
Stream::from_iterable(iter) | Stream from iterable values |
Stream::from_effect(effect) | Stream from Effect<Vec<A>, E, R> |
stream.map, filter, take, take_while, drop_while | Stream transformations |
stream.run_collect() | Collect all elements into Vec<A> |
stream.run_fold(init, f) | Fold elements |
stream.run_for_each(f) | Run synchronous consumer |
stream.run_for_each_effect(f) | Run effectful consumer |
Sink::collect() | Sink collecting elements into Vec |
Sink::fold_left(init, f) | Folding sink |
Sink::drain() | Discarding sink |
sink.run(stream) | Run a sink against a stream |
Schema
| Function | Notes |
|---|---|
Unknown::{Null, Bool, I64, F64, String, Array, Object} | Dynamic input values |
string(), i64(), f64(), bool_() | Primitive schemas |
optional(s), array(s) | Container schemas |
tuple, tuple3, tuple4 | Tuple schemas |
struct_, struct3, struct4 | Struct schemas from field schemas |
union_, union_chain | Union schemas |
filter(s, pred, msg) / refine(...) | Validation refinements |
transform(s, decode, encode) | Bidirectional transformation |
schema.decode(input) | Decode typed wire input |
schema.decode_unknown(&unknown) | Decode Unknown input |
ParseError::new(path, message) | Single parse issue |
ParseErrors::one(err) / ParseErrors::new(vec) | Aggregate parse issues |
Migrating from async fn to effects
This appendix shows the current migration shape for effectful: return Effect, keep dependencies in R, and run at the boundary with an explicit environment.
Mental Model Shift
In ordinary async Rust, calling an async fn creates a Future; awaiting it runs the work.
async fn get_user(id: u64, db: &DbClient) -> Result<User, DbError> {
db.query_one(id).await
}
In effectful, a function returns an Effect description. The runner receives the environment later.
fn get_user(id: u64) -> Effect<User, DbError, DbClient> {
effect!(|db: &mut DbClient| {
bind* db.query_one(id)
})
}
let user = run_blocking(get_user(42), db_client)?;
Pattern 1: async fn to Effect
Before
pub async fn process_order(
order_id: OrderId,
db: &DbClient,
mailer: &MailClient,
) -> Result<Receipt, AppError> {
let order = db.get_order(order_id).await?;
let receipt = db.complete_order(order).await?;
mailer.send_receipt(&receipt).await?;
Ok(receipt)
}
After
#[derive(Clone)]
struct AppEnv {
db: DbClient,
mailer: MailClient,
}
pub fn process_order(order_id: OrderId) -> Effect<Receipt, AppError, AppEnv> {
effect!(|env: &mut AppEnv| {
let order = bind* env.db.get_order(order_id).map_error(AppError::Db);
let receipt = bind* env.db.complete_order(order).map_error(AppError::Db);
bind* env.mailer.send_receipt(&receipt).map_error(AppError::Mail);
receipt
})
}
Migration steps:
- Change
async fntofnreturningEffect<A, E, R>. - Move dependencies into an environment type or service context.
- Replace
.await?on effectful operations withbind*. - Return the success value as the block tail.
- Call
run_blocking(effect, env)orrun_async(effect, env)at the boundary.
Pattern 2: Wrapping Third-Party Async
Third-party libraries return futures, not effects. Wrap them with from_async.
fn fetch_price(symbol: String) -> Effect<f64, reqwest::Error, ()> {
from_async(move |_r: &mut ()| async move {
let response = reqwest::get(format!("https://api.example.com/price/{symbol}"))
.await?;
let body = response.json::<PriceResponse>().await?;
Ok(body.price)
})
}
Inside the async closure, use normal .await. Outside, compose the result as an Effect.
Pattern 3: Error Types
Map infrastructure errors into your application error at composition points.
#[derive(Debug)]
enum AppError {
Db(DbError),
Mail(MailError),
}
let effect = db_call().map_error(AppError::Db);
Use catch to recover with another effect, and catch_all to turn typed errors into fallback success values.
Pattern 4: Services
For application dependency injection, prefer #[derive(Service)] plus ServiceContext.
#[derive(Clone, Service)]
struct AppState {
request_count: Arc<AtomicU64>,
}
fn handler() -> Effect<Response, AppError, ServiceContext> {
AppState::use_sync(|state| {
state.request_count.fetch_add(1, Ordering::Relaxed);
Response::ok()
})
}
let env = AppState::new().to_context();
let response = run_blocking(handler(), env)?;
For tagged HList contexts, use service_key!(pub struct Key);, service_env::<Key, _>(value), and Context::get::<Key>().
Pattern 5: Transactional State
Use TRef when state updates must compose transactionally.
let counter = run_blocking(commit(TRef::make(0_u64)), ())?;
fn increment(counter: TRef<u64>) -> Effect<u64, (), ()> {
effect! {
bind* commit(counter.update_stm(|n| n + 1));
bind* commit(counter.read_stm())
}
}
There is no stm! macro in the current API; compose transactions with flat_map, map, and helpers like update_stm.
Pattern 6: Resource Cleanup
Use Scope when cleanup must run at an explicit lifetime boundary.
fn with_connection<A, E, F>(pool: Pool<Connection, DbError>, f: F) -> Effect<A, E, ()>
where
F: FnOnce(Connection) -> Effect<A, E, ()> + 'static,
A: 'static,
E: From<DbError> + 'static,
{
scope_with(move |scope| {
effect! {
let conn = bind* pool.get().provide_env(scope.clone()).map_error(E::from);
let close_conn = conn.clone();
scope.add_finalizer(Box::new(move |_| close_conn.close()));
bind* f(conn)
}
})
}
For pooled resources, Pool::get() registers return-to-pool cleanup on the provided Scope.
Migration Strategy
- Convert leaf async wrappers first with
from_async. - Introduce explicit environment structs or
ServiceContext. - Move
run_blocking/run_asyncto program edges. - Convert tests to pass test environments or test layers.
- Replace stale helper assumptions with current names:
run_collect,run_fold,retry(|| ..., schedule),TRef::make,run_test(effect, env).
You can mix old async code with effects during migration. Wrap async futures at the edge and keep new domain workflows as Effect values.
Glossary
Key terms used throughout this book.
bind* (bind operator)
The prefix operator inside effect! that runs an inner effect and yields its success value. let x = bind* eff binds the value; bind* eff; runs and discards it.
Backpressure
The mechanism by which a slow consumer controls a fast producer. In effectful streams this is represented by BackpressurePolicy: BoundedBlock, DropNewest, DropOldest, or Fail.
Brand
A zero-cost nominal wrapper. Brand<String, EmailMarker> and Brand<String, NameMarker> are different types even if both wrap String.
Cause<E>
The structured reason an effect failed: Fail(E), Die(String), Interrupt(FiberId), or composed causes Both / Then.
Chunk<A>
A batch of stream elements. Streams pull and process chunks rather than one element at a time internally.
Clock
A trait abstracting time. LiveClock uses real time; TestClock is controlled by tests.
commit / atomically
Functions that lift Stm<A, E> into Effect<A, E, R>. Running the effect executes the transaction and retries on conflicts or Stm::retry().
Context<...>
Typed heterogeneous environment for tagged services, built from Cons / Nil cells containing Tagged<K, V> values.
Effect<A, E, R>
The central type: a lazy description of work that succeeds with A, fails with E, and requires environment R.
effect! macro
Do-notation for Effect. It rewrites bind* effect into bind/await plumbing and wraps the block tail as success.
Exit<A, E>
Terminal outcome: Exit::Success(A) or Exit::Failure(Cause<E>). Returned by run_test and FiberHandle::await_exit().
Fiber
A lightweight unit of concurrent work. Spawn with run_fork; await with join() or await_exit().
FiberRef
A fiber-scoped variable used for request ids, tracing context, and similar dynamic data.
from_async
Constructor that lifts an async closure returning Result<A, E> into an Effect<A, E, R>.
HList
The compile-time linked-list shape Cons<Head, Tail> / Nil used by the typed Context API.
Layer
A lazy recipe for building services into a ServiceContext. Compose layers with merge, provide, provide_merge, and memoized().
Never
The uninhabited runtime error type (Infallible) used when an effect cannot fail through its typed error channel.
ParseError / ParseErrors
Schema parse failures. Build a single issue with ParseError::new(path, message) and aggregate with ParseErrors::one or ParseErrors::new.
R (environment type parameter)
The third parameter of Effect<A, E, R>. It encodes required dependencies in the type.
run_blocking
Synchronous runner: run_blocking(effect, env). Use at application/test boundaries, not inside reusable library functions.
effect_test
Attribute for effect-returning tests. The test body returns Effect; the harness executes it, provides the environment or layer, checks leak counters, and panics with Debug output on typed failure.
run_test
Lower-level synchronous test runner: run_test(effect, env) -> Exit<A, E>. It resets and checks the test leak counters around the run.
Schedule
A value describing delays and repeat limits for the free retry(|| effect, schedule) and repeat(|| effect, schedule) functions. Current constructors include recurs, spaced, exponential, recurs_while, and recurs_until.
Schema
A Schema<A, I, E> decodes wire value I or dynamic Unknown into A and can encode A back to I.
Scope
A resource lifetime boundary. Register finalizers with Scope::add_finalizer; close the scope to run finalizers.
service_key!
Macro for declaring a nominal key type, e.g. service_key!(pub struct DbKey);. Pair it with Tagged<DbKey, Db> / Service<DbKey, Db>.
Sink
A stream consumer represented by Sink<Out, In, E, R>. Built-ins include Sink::collect, Sink::fold_left, and Sink::drain; run with sink.run(stream).
Stm<A, E>
A composable transactional computation. Use Stm::succeed, Stm::fail, Stm::retry, flat_map, and map; run with commit / atomically.
Stream
A pull stream of values with typed error and environment. Build with from_iterable or from_effect; consume with run_collect, run_fold, or sinks.
Tag / Tagged
The typed key/value mechanism for contexts. Tagged<K, V> stores a value V under nominal key K.
TestClock
Controllable clock for deterministic scheduling tests. Construct with TestClock::new(start_instant).
TRef<T>
A transactional mutable cell. Allocate with TRef::make(value) inside Stm; use read_stm, write_stm, update_stm, and modify_stm.
Unknown
Dynamic input for schemas. Current variants are Null, Bool, I64, F64, String, Array, and Object.