Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

TQueue, TMap, TSemaphore — Transactional Collections

effectful provides STM-aware collection types. Their constructors and operations return Stm values, so they compose with TRef reads/writes and commit atomically.

TQueue

use effectful::{Stm, TQueue};

let queue_stm: Stm<TQueue<Job>, ()> = TQueue::bounded(100);
let unbounded_stm: Stm<TQueue<Job>, ()> = TQueue::unbounded();

let offer: Stm<bool, ()> = queue.offer(job); // false when bounded and full
let take: Stm<Job, ()> = queue.take();       // retries while empty

TQueue::offer does not block when a bounded queue is full; it returns false. TQueue::take retries when the queue is empty.

Producer-Consumer Pattern

fn producer(queue: TQueue<Job>, jobs: Vec<Job>) -> Effect<(), (), ()> {
    effect! {
        for job in jobs {
            let accepted = bind* commit(queue.offer(job));
            if !accepted {
                return Err(());
            }
        }
    }
}

fn consumer(queue: TQueue<Job>) -> Effect<(), JobError, ()> {
    effect! {
        loop {
            let job = bind* commit(queue.take());
            bind* process_job(job);
        }
    }
}

TMap

use effectful::{Stm, TMap};

let map_stm: Stm<TMap<String, User>, ()> = TMap::make();

let get: Stm<Option<User>, ()> = map.get(&"alice".to_string());
let set: Stm<(), ()> = map.set("alice".to_string(), alice_user);
let delete: Stm<(), ()> = map.delete(&"alice".to_string());

TMap is a transactional hash map. Reading from a TMap and updating a TRef in the same transaction is atomic.

let transaction = user_map.get(&"alice".to_string()).flat_map(move |user| {
    access_counter
        .update_stm(|count| count + 1)
        .map(move |_| user)
});

let effect = commit(transaction);

TSemaphore

use effectful::{Stm, TSemaphore};

let sem_stm: Stm<TSemaphore, ()> = TSemaphore::make(10);

let acquire: Stm<(), ()> = sem.acquire(); // retries while zero
let release: Stm<(), ()> = sem.release();

TSemaphore is a transactional permit counter. acquire decrements by one or retries when no permits are available; release increments by one.

let guarded = sem.acquire().flat_map(move |_| {
    update_shared_state().flat_map(move |result| {
        sem.release().map(move |_| result)
    })
});

let effect = commit(guarded);

Summary

TypePurpose
TRef<T>Single transactional cell
TQueue<T>Transactional FIFO queue
TMap<K, V>Transactional hash map
TSemaphoreTransactional permit counter

All compose as Stm values and commit atomically with other STM operations.