qubit-cas

Typed compare-and-swap executor with retry-aware conflict handling

Rust CI Coverage Crates.io Rust License

概览

面向 Rust 的强类型 compare-and-swap(CAS)执行器。qubit-cas 将常见的 「读取共享快照、根据快照生成新值、通过 compare-and-swap 原子写入新值、遇到竞争后重试」 流程封装为可复用的 CasExecutor

CAS 机制可以理解为“先比较、再交换”:只有当共享状态仍等于你读取到的旧快照时, 新值才会被原子写入并生效;若期间被其他线程改动,本次写入会失败并可按策略重试。 它的优点是无锁路径延迟低、并发冲突时不会产生写丢失;代价是高竞争下可能出现 较多重试,进而带来 CPU 开销增加与尾延迟上升。

本 crate 基于 qubit-atomicqubit-functionqubit-retry。它适合共享状态以 不可变 Arc<T> 快照保存、并且每次更新都希望用显式类型表达结果决策的场景。

特性

安装

[dependencies]
qubit-cas = "0.7"

qubit-cas 使用 qubit_atomic::AtomicRef<T> 保存共享状态。应用代码如果需要构造或 持有该状态,应直接依赖 qubit-atomic

启用异步执行:

[dependencies]
qubit-cas = { version = "0.7", features = ["tokio"] }

可选 feature:

默认 feature 为空,因此同步 CAS 执行不会引入异步运行时。

适用场景

当一次更新可以表达为“从当前不可变快照推导出一个类型化决策”时,适合使用 qubit-cas

如果临界区耗时较长、更新逻辑包含无法安全重放的副作用,或状态无法表示为不可变替换值, 应优先考虑 mutex、数据库事务或领域内专用锁。

快速开始

use qubit_atomic::AtomicRef;
use qubit_cas::{CasDecision, CasExecutor};

#[derive(Debug, PartialEq, Eq)]
struct Inventory {
    stock: u32,
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum OrderError {
    OutOfStock,
}

fn main() {
    let state = AtomicRef::from_value(Inventory { stock: 3 });
    let executor = CasExecutor::<Inventory, OrderError>::latency_first();

    let outcome = executor.execute(&state, |current: &Inventory| {
        if current.stock == 0 {
            return CasDecision::abort(OrderError::OutOfStock);
        }

        CasDecision::update(
            Inventory {
                stock: current.stock - 1,
            },
            current.stock - 1,
        )
    });

    println!(
        "CAS attempts={}, conflicts={}, conflict_ratio={:.2}",
        outcome.report().attempts_total(),
        outcome.report().conflicts(),
        outcome.report().conflict_ratio(),
    );

    match outcome.into_result() {
        Ok(success) => {
            println!("stock updated successfully, remaining: {}", success.output());
            assert!(success.is_updated());
            assert_eq!(*success.output(), 2);
            assert_eq!(state.load().stock, 2);
        }
        Err(error) => {
            // 缺货是业务结果,不应直接 panic。
            eprintln!("order rejected: {error:?}");
        }
    }
}

这段示例展示了一个“下单扣减库存”的 CAS 更新流程:

决策模型

每次业务操作都会收到当前状态快照,并返回 CasDecision<T, R, E>

execute* 返回 CasOutcome<T, R, E>。它包含业务层 Result<CasSuccess<T, R>, CasError<T, E>> 以及本次执行的 CasExecutionReport,调用方可以在不注册 Hook 的情况下读取冲突次数和冲突率。

状态与操作建议

CAS 操作可能被调用多次,因为冲突和可重试业务失败都会让流程基于新快照重新尝试。 尽量保持 operation closure 确定且无副作用。如果必须执行副作用,建议在 execute* 成功返回后再执行,或让副作用具备幂等性并绑定外部 operation id。

共享值应当足够轻量,便于克隆为新的 Arc<T> 替换值。对于较大的状态,可以考虑持久化数据结构、 内部 Arc 字段,或只把指向大型不可变数据的小型状态对象放入 CAS。

错误处理

终止失败会以 CasError<T, E> 返回,并通过 CasErrorKind 分类:

控制分支建议使用 error.kind();需要读取保留下来的业务错误时使用 error.error(); 如果最后一次失败保留了当时观察到的状态快照,可通过 error.current() 读取。

执行策略

qubit-cas 提供三种常见策略,方便按场景直接选用:

通常可以先用 latency_first() 起步;如果报告中 conflict_ratio >= 0.30attempts_total >= 3,说明出现明显热点争用, 可以切到 contention_adaptive();如果业务更看重“尽量成功”而非“尽快返回”, 可选 reliability_first()

面向状态码的 Fast CAS

FastCas 是面向 usize 状态码的低层 CAS 路径,适合状态机、executor、 线程池内部状态和其他高频热路径。它假设共享状态已经被编码成紧凑数字, 状态迁移必须保持无分配。

常规 CasExecutor 基于不可变 Arc<T> 快照工作,提供业务重试、hooks、 执行报告、异步执行、超时处理和争用观测。FastCas 刻意不包含这些能力: 每次尝试只读取当前 usize,让调用方给出状态迁移决策,然后对刚才观测到的值 执行一次原子 compare-and-set。更小的接口让热路径更直接,也更适合紧凑状态机。

需求推荐选择
需要丰富快照、执行报告、hooks、异步支持、超时处理或业务级重试CasExecutor
状态已编码为 usize,需要无分配执行、无报告构造,并且只重试 CAS 冲突FastCas

核心类型包括:

use qubit_cas::{
    FastCas,
    FastCasState,
};

let state = FastCasState::new(0);
let cas = FastCas::spin(8);

let success = cas
    .update_by(&state, |current| {
        let next = current + 1;
        Ok::<_, &'static str>((next, next))
    })
    .expect("state code should update");

assert_eq!(success.previous(), 0);
assert_eq!(success.current(), 1);
assert_eq!(success.into_output(), 1);
assert_eq!(state.load(), 1);

显式状态机可以直接返回 FastCasDecision

use qubit_cas::{
    FastCas,
    FastCasDecision,
    FastCasState,
};

const IDLE: usize = 0;
const RUNNING: usize = 1;
const DONE: usize = 2;

let state = FastCasState::new(IDLE);
let cas = FastCas::spin(8);

cas.compare_update(&state, IDLE, RUNNING)
    .expect("IDLE should transition to RUNNING");

let success = cas
    .execute(&state, |current| match current {
        RUNNING => FastCasDecision::<_, &'static str>::update(DONE, DONE),
        DONE => FastCasDecision::finish(DONE),
        _ => FastCasDecision::abort("invalid state"),
    })
    .expect("transition should be valid");

assert_eq!(success.current(), DONE);
assert_eq!(success.into_output(), DONE);

FastCas 只重试 CAS 冲突,不重试调用方返回的业务错误,不构造执行报告,也不 调用 hooks。传入的操作闭包是 Fn,当其他写入者先完成更新时,同一个闭包可能 被多次调用,因此闭包应保持确定性,并避免不可重复的副作用。调用方已经知道期望 状态码、只需要固定 expected -> next 迁移时,可以使用 compare_updatecompare_update_with,这样不会在观测到其他状态后重新计算迁移。

FastCasPolicy::once() 最多执行一次 compare-and-set。 FastCasPolicy::spin(max_attempts) 在有界循环中紧凑重试冲突。 FastCasPolicy::spin_yield(spin_attempts, max_attempts) 先自旋,超过自旋前缀后 在后续尝试前调用 thread::yield_now()。尝试次数为 0 时会规范化为 1, 因此每种策略都至少能基于一次观测状态尝试推进。

重试配置

预置执行器不满足需求时,可以使用 builder:

use std::time::Duration;

use qubit_cas::CasExecutor;

let executor = CasExecutor::<usize, &'static str>::builder()
    .max_retries(4)
    .exponential_backoff(Duration::from_millis(2), Duration::from_millis(50))
    .jitter_factor(0.25)
    .max_operation_elapsed(Some(Duration::from_millis(250)))
    .build()
    .expect("valid CAS retry settings");

冲突观测与 Hooks

Hook 绑定到单次执行,因此同一个 executor 可以在不同调用中使用不同的观测逻辑。 默认情况下只返回 CasExecutionReport,如果需要实时事件流,可开启 event_stream()

use qubit_atomic::AtomicRef;
use qubit_cas::{
    CasAttemptFailureKind, CasDecision, CasEvent, CasExecutor, CasHooks, CasObservabilityConfig,
};

let state = AtomicRef::from_value(1usize);
let executor = CasExecutor::<usize, &'static str>::builder()
    .observability(CasObservabilityConfig::event_stream())
    .build_latency_first()
    .expect("valid CAS settings");

let hooks = CasHooks::new().on_event(|event: &CasEvent| {
    if let CasEvent::AttemptFailed { context, kind } = event {
        if *kind == CasAttemptFailureKind::Conflict {
            eprintln!("CAS conflict at attempt {}", context.attempt());
        }
    }
});

let success = executor
    .execute_with_hooks(
        &state,
        |current: &usize| CasDecision::update(*current + 1, *current + 1),
        hooks,
    )
    .expect("CAS should succeed");

assert_eq!(*success.output(), 2);

检测能力与性能权衡

冲突检测本身也会增加热路径成本,因此 qubit-cas 将可观测能力分为三个层级:

建议默认使用 ReportOnly,通过 outcome.report().conflict_ratio() 做周期性指标上报。 只有在需要定位热点或接入 trace 时再开启 EventStream。不要在 Hook 中同步写日志、 同步请求远端 metrics 或执行复杂格式化;高冲突时这些操作会被按 attempt 次数放大。 更稳妥的方式是把事件投递到无阻塞 channel,由后台任务批量消费。

异步用法

启用 tokio feature 后,异步操作会收到一个 Arc<T> 快照。每次尝试可以设置超时, 超时后可继续重试,也可直接中止。

use std::time::Duration;

use qubit_atomic::AtomicRef;
use qubit_cas::{CasDecision, CasExecutor};

#[tokio::main]
async fn main() {
    let state = AtomicRef::from_value(0usize);
    let executor = CasExecutor::<usize, &'static str>::builder()
        .max_attempts(3)
        .attempt_timeout(Some(Duration::from_millis(100)))
        .retry_on_timeout()
        .build()
        .expect("valid CAS settings");

    let success = executor
        .execute_async(&state, |current| async move {
            CasDecision::update(*current + 1, *current + 1)
        })
        .await
        .expect("async CAS should succeed");

    assert_eq!(*success.current().as_ref(), 1);
}

公共 API 速览

项目结构

测试与 CI

在 crate 根目录快速执行本地检查:

cargo test
cargo clippy --all-targets --all-features -- -D warnings

若要与仓库 CI 环境保持一致,请运行:

./align-ci.sh
./ci-check.sh
./coverage.sh json

./align-ci.sh 会先对齐本地工具链和 CI 相关配置;./ci-check.sh 复现流水线检查。 修改运行期行为并需要关注覆盖率时,可配合使用 ./coverage.sh

参与贡献

欢迎通过 Issue 与 Pull Request 参与本仓库。建议单次变更聚焦一个主题;修改行为时补充或更新测试; 影响公开 API 或用户可见行为时,同步更新本文档或 rustdoc。

向本仓库贡献内容即表示您同意以 Apache License, Version 2.0(与本项目相同) 授权您的贡献。

许可证与版权

Copyright (c) 2026. Haixing Hu.

本软件依据 Apache License, Version 2.0 授权;完整许可文本见仓库根目录的 LICENSE 文件。

作者与维护

Haixing Hu — Qubit Co. Ltd.

源码仓库github.com/qubit-ltd/rs-cas
API 文档docs.rs/qubit-cas
Crate 发布crates.io/crates/qubit-cas