qubit-atomic

User-friendly atomic operations wrapper providing JDK-like atomic API

Rust CI Coverage Crates.io Rust License

为 Rust 提供类似 JDK 的用户友好原子操作封装。

概述

Qubit Atomic 是一个全面的原子操作库,提供易于使用的原子类型和合理的默认内存序,类似于 Java 的 java.util.concurrent.atomic 包。它隐藏了内存序的复杂性,同时保持零成本抽象,并允许高级用户访问底层类型以进行细粒度控制。

设计目标

特性

🔢 泛型原子基础类型

🔢 AtomicCountAtomicSignedCount

🔗 原子引用类型

🤝 共享所有权便利封装

🎯 聚焦的公开 API

安装

Cargo.toml 中添加:

[dependencies]
qubit-atomic = "0.13"

快速开始

指定值类型 T

Atomic<T> 对基础值类型 T 泛型。多数情况下,编译器会根据传给 Atomic::new 的实参推断 T,但像 0 这样的整型字面量可能在不同位宽之间产生歧义。

此时应显式指定 T:在构造函数上使用 turbofish 写成 Atomic::<T>::new(...),或为变量添加类型注解:

use qubit_atomic::Atomic;

let wide: Atomic<u64> = Atomic::new(0);
assert_eq!(wide.load(), 0u64);

let narrow = Atomic::<i16>::new(0);
assert_eq!(narrow.load(), 0i16);

const 初始化

普通代码优先使用 Atomic<T>。它是推荐的泛型入口,可以让一个类型覆盖所有基础类型特例,也能保持公开 API 更集中。

稳定 Rust 目前还不能让 Atomic<T>::newconst fn 中调用隐藏 trait 构造函数。需要 static 或其他 const 初始化原子值时,使用 atomic::primitive 下的具体封装:

use qubit_atomic::atomic::primitive::{
    AtomicBool,
    AtomicU32,
};

static READY: AtomicBool = AtomicBool::new(false);
static NEXT_ID: AtomicU32 = AtomicU32::new(1);

示例:并发 Atomic<i32>

use qubit_atomic::Atomic;
use std::sync::Arc;
use std::thread;

fn main() {
    let counter = Arc::new(Atomic::<i32>::new(0));
    let mut handles = vec![];

    // 启动 10 个线程,每个线程递增计数器 1000 次
    for _ in 0..10 {
        let counter = counter.clone();
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                counter.fetch_inc();
            }
        });
        handles.push(handle);
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 验证结果
    assert_eq!(counter.load(), 10000);
    println!("最终计数:{}", counter.load());
}

AtomicCountAtomicSignedCount

纯指标统计使用 Atomic<T>。计数值本身属于并发状态(例如活跃任务数或 终止判断)时,使用 AtomicCount

use qubit_atomic::{
    AtomicCount,
    AtomicSignedCount,
};

fn main() {
    let active_tasks = AtomicCount::zero();

    active_tasks.inc();
    assert!(!active_tasks.is_zero());

    if active_tasks.dec() == 0 {
        println!("所有活跃任务都已完成");
    }

    let backlog_delta = AtomicSignedCount::zero();
    assert_eq!(backlog_delta.add(5), 5);
    assert_eq!(backlog_delta.sub(8), -3);
    assert!(backlog_delta.is_negative());
}

共享所有权封装

当需要在线程或组件之间共享原子容器本身时,使用 ArcAtomic* 封装。 它们的 clone() 会克隆外层 Arc,因此所有 clone 都观察并修改同一个 原子容器。

use qubit_atomic::{
    ArcAtomic,
    ArcAtomicCount,
    ArcAtomicRef,
    ArcAtomicSignedCount,
};
use std::sync::Arc;
use std::thread;

fn main() {
    let requests = ArcAtomic::new(0usize);
    let worker_requests = requests.clone();

    let handle = thread::spawn(move || {
        worker_requests.fetch_inc();
    });
    handle.join().expect("worker should finish");

    assert_eq!(requests.load(), 1);
    assert_eq!(requests.strong_count(), 1);

    let active_tasks = ArcAtomicCount::zero();
    let shared_tasks = active_tasks.clone();
    assert_eq!(shared_tasks.inc(), 1);
    assert_eq!(active_tasks.get(), 1);

    let backlog = ArcAtomicSignedCount::zero();
    let shared_backlog = backlog.clone();
    assert_eq!(shared_backlog.sub(3), -3);
    assert_eq!(backlog.get(), -3);

    let config = ArcAtomicRef::from_value(String::from("v1"));
    let same_config = config.clone();
    same_config.store(Arc::new(String::from("v2")));
    assert_eq!(config.load().as_str(), "v2");
}

CAS 循环

use qubit_atomic::Atomic;

fn increment_even_only(atomic: &Atomic<i32>) -> Result<i32, &'static str> {
    let mut current = atomic.load();
    loop {
        // 只对偶数值进行递增
        if current % 2 != 0 {
            return Err("值为奇数");
        }

        let new = current + 2;
        match atomic.compare_set(current, new) {
            Ok(_) => return Ok(new),
            Err(actual) => current = actual, // 重试
        }
    }
}

fn main() {
    let atomic = Atomic::<i32>::new(10);
    match increment_even_only(&atomic) {
        Ok(new_value) => println!("成功递增到:{}", new_value),
        Err(e) => println!("失败:{}", e),
    }
    assert_eq!(atomic.load(), 12);
}

函数式更新

use qubit_atomic::Atomic;

fn main() {
    let atomic = Atomic::<i32>::new(10);

    // 使用函数更新(返回旧值)
    let old_value = atomic.fetch_update(|x| {
        if x < 100 {
            x * 2
        } else {
            x
        }
    });

    assert_eq!(old_value, 10);
    assert_eq!(atomic.load(), 20);
    println!("更新后的值:{}", atomic.load());

    // 更新并返回提交后的新值
    let new_value = atomic.update_and_get(|x| x + 5);
    assert_eq!(new_value, 25);
    assert_eq!(atomic.load(), 25);

    // 条件更新并返回提交后的新值
    let accepted_new = atomic.try_update_and_get(|x| (x < 100).then_some(x + 5));
    assert_eq!(accepted_new, Some(30));
    assert_eq!(atomic.load(), 30);

    // 累积操作(返回旧值)
    let old_result = atomic.fetch_accumulate(5, |a, b| a + b);
    assert_eq!(old_result, 30);
    assert_eq!(atomic.load(), 35);

    // 累积并返回提交后的新值
    let accumulated = atomic.accumulate_and_get(5, |a, b| a + b);
    assert_eq!(accumulated, 40);
    assert_eq!(atomic.load(), 40);
    println!("累积后的值:{}", atomic.load());
}

原子引用

use qubit_atomic::AtomicRef;
use std::sync::Arc;

#[derive(Debug, Clone)]
struct Config {
    timeout: u64,
    max_retries: u32,
}

fn main() {
    let config = Arc::new(Config {
        timeout: 1000,
        max_retries: 3,
    });

    let atomic_config = AtomicRef::new(config);

    // 更新配置
    let new_config = Arc::new(Config {
        timeout: 2000,
        max_retries: 5,
    });

    let old_config = atomic_config.swap(new_config);
    println!("旧配置:{:?}", old_config);
    println!("新配置:{:?}", atomic_config.load());

    // 使用函数更新(返回旧值)
    let old = atomic_config.fetch_update(|current| {
        Arc::new(Config {
            timeout: current.timeout * 2,
            max_retries: current.max_retries + 1,
        })
    });

    println!("更新前的配置:{:?}", old);
    println!("更新后的配置:{:?}", atomic_config.load());

    // 更新并返回提交后的新引用
    let updated = atomic_config.update_and_get(|current| {
        Arc::new(Config {
            timeout: current.timeout + 500,
            max_retries: current.max_retries + 1,
        })
    });
    println!("提交后的配置:{:?}", updated);

    // 短生命周期读取;快路径不会克隆 Arc
    let snapshot = atomic_config.load_guard();
    println!("快照配置:{:?}", snapshot);

    // 条件更新;返回提交后的新引用,拒绝时返回 None
    let accepted = atomic_config.try_update_and_get(|current| {
        (current.timeout < 10_000).then_some(Arc::new(Config {
            timeout: current.timeout + 1000,
            max_retries: current.max_retries,
        }))
    });
    assert!(accepted.is_some());
}

布尔标志

use qubit_atomic::Atomic;
use std::sync::Arc;

struct Service {
    running: Arc<Atomic<bool>>,
}

impl Service {
    fn new() -> Self {
        Self {
            running: Arc::new(Atomic::<bool>::new(false)),
        }
    }

    fn start(&self) {
        // 只有当前未运行时才启动
        if self.running.set_if_false(true).is_ok() {
            println!("服务启动成功");
        } else {
            println!("服务已经在运行");
        }
    }

    fn stop(&self) {
        // 只有当前运行时才停止
        if self.running.set_if_true(false).is_ok() {
            println!("服务停止成功");
        } else {
            println!("服务已经停止");
        }
    }

    fn is_running(&self) -> bool {
        self.running.load()
    }
}

fn main() {
    let service = Service::new();

    service.start();
    assert!(service.is_running());

    service.start(); // 重复启动会失败

    service.stop();
    assert!(!service.is_running());

    service.stop(); // 重复停止会失败
}

浮点数原子操作

use qubit_atomic::Atomic;
use std::sync::Arc;
use std::thread;

fn main() {
    let sum = Arc::new(Atomic::<f32>::new(0.0));
    let mut handles = vec![];

    // 启动 10 个线程,每个线程累加 100 次
    for _ in 0..10 {
        let sum = sum.clone();
        let handle = thread::spawn(move || {
            for _ in 0..100 {
                sum.fetch_add(0.01);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    // 注意:由于浮点数精度问题,结果可能不是精确的 10.0
    let result = sum.load();
    println!("累加结果:{:.6}", result);
    println!("误差:{:.6}", (result - 10.0).abs());
}

API 参考

通用操作

方法描述内存序
new(value)创建新的原子值-
load()加载当前值Acquire
store(value)存储新值Release
swap(value)交换值,返回旧值AcqRel
compare_set(current, new)CAS 操作,显式返回成功或失败AcqRel/Acquire
compare_and_exchange(current, new)CAS 操作,返回观察值AcqRel/Acquire
fetch_update(f)函数式更新,返回旧值AcqRel/Acquire
update_and_get(f)函数式更新,返回新值AcqRel/Acquire
try_update(f)条件函数式更新,返回 Option<旧值>AcqRel/Acquire
try_update_and_get(f)条件函数式更新,返回 Option<新值>AcqRel/Acquire
inner()访问底层后端类型-

基础值 weak CAS 操作

这些方法适用于基础值类型的 Atomic<T> 特例。AtomicRef<T> 不提供这些方法; 它只暴露强指针 CAS。

方法描述内存序
compare_set_weak(current, new)弱 CAS,显式返回成功或失败AcqRel/Acquire
compare_and_exchange_weak(current, new)弱 CAS,返回 Ok(观察值)Err(实际值)AcqRel/Acquire

整数操作

方法描述内存序
fetch_inc()后增,返回旧值Relaxed
fetch_dec()后减,返回旧值Relaxed
fetch_add(delta)后加,返回旧值Relaxed
fetch_sub(delta)后减,返回旧值Relaxed
fetch_inc_with_ordering(ordering)后增,返回旧值调用方指定
fetch_dec_with_ordering(ordering)后减,返回旧值调用方指定
fetch_add_with_ordering(delta, ordering)后加,返回旧值调用方指定
fetch_sub_with_ordering(delta, ordering)后减,返回旧值调用方指定
fetch_mul(factor)后乘,返回旧值AcqRel(CAS 循环)
fetch_div(divisor)后除,返回旧值AcqRel(CAS 循环)
fetch_and(value)按位与,返回旧值AcqRel
fetch_or(value)按位或,返回旧值AcqRel
fetch_xor(value)按位异或,返回旧值AcqRel
fetch_not()按位取反,返回旧值AcqRel
fetch_max(value)原子取最大值,返回旧值AcqRel
fetch_min(value)原子取最小值,返回旧值AcqRel
fetch_update(f)函数式更新,返回旧值AcqRel/Acquire
update_and_get(f)函数式更新,返回新值AcqRel/Acquire
try_update(f)条件函数式更新,返回 Option<旧值>AcqRel/Acquire
try_update_and_get(f)条件函数式更新,返回 Option<新值>AcqRel/Acquire
fetch_accumulate(x, f)累积,返回旧值AcqRel/Acquire
accumulate_and_get(x, f)累积,返回新值AcqRel/Acquire

基础整数原子操作会在溢出和下溢时按 Rust 原子整数语义环绕。若业务语义要求拒绝溢出或下溢,请使用 AtomicCountAtomicSignedCount_with_ordering 变体仅用于整数读-改-写计数加减操作,适合计数值同时作为同步信号的场景。

AtomicCount / AtomicSignedCount 的方法

方法AtomicCountAtomicSignedCount内存序描述
new(value)usizeisize-创建实例
zero()支持支持-创建零值实例
get()usizeisizeAcquire读取当前值
is_zero()支持支持Acquire判断值是否为零
is_positive()支持支持Acquire判断值是否为正
is_negative()不支持支持Acquire判断值是否为负
inc()支持支持AcqRel/Acquire加一,返回新值
dec()下溢 panic允许负数AcqRel/Acquire减一,返回新值
add(delta)溢出 panic溢出/下溢 panicAcqRel/Acquire加 delta,返回新值
sub(delta)下溢 panic溢出/下溢 panicAcqRel/Acquire减 delta,返回新值
try_add(delta)溢出返回 None溢出/下溢返回 NoneAcqRel/Acquire检查式加法
try_dec()零值返回 None不支持AcqRel/Acquire(仅 AtomicCount检查式减一
try_sub(delta)下溢返回 None溢出/下溢返回 NoneAcqRel/Acquire检查式减法

共享所有权封装的方法

ArcAtomic* 封装会解引用到底层原子容器,因此可以直接在封装值上调用 loadfetch_incstoreincsub 等操作。

方法适用类型描述
new(value)ArcAtomic<T>ArcAtomicCountArcAtomicSignedCount从初始值创建共享封装
new(Arc<T>)ArcAtomicRef<T>从已有 Arc<T> 创建共享原子引用
from_value(value)ArcAtomicRef<T>从自有值创建共享原子引用
from_atomic(...)ArcAtomic<T>封装已有 Atomic<T>
from_atomic_ref(...)ArcAtomicRef<T>封装已有 AtomicRef<T>
from_count(...)ArcAtomicCountArcAtomicSignedCount封装已有计数容器
from_arc(arc)所有 ArcAtomic* 封装封装已有 Arc<...> 容器
as_arc()所有 ArcAtomic* 封装借用底层 Arc<...>
into_arc()所有 ArcAtomic* 封装消耗封装并返回底层 Arc<...>
strong_count()所有 ArcAtomic* 封装返回 Arc 强引用数量

布尔操作

方法描述内存序
fetch_set()设置为 true,返回旧值AcqRel
fetch_clear()设置为 false,返回旧值AcqRel
fetch_not()取反,返回旧值AcqRel
fetch_and(value)逻辑与,返回旧值AcqRel
fetch_or(value)逻辑或,返回旧值AcqRel
fetch_xor(value)逻辑异或,返回旧值AcqRel
set_if_false(new)如果为 false 则 CASAcqRel/Acquire
set_if_true(new)如果为 true 则 CASAcqRel/Acquire

浮点数操作

方法描述内存序
fetch_add(delta)原子加法,返回旧值AcqRel(CAS 循环)
fetch_sub(delta)原子减法,返回旧值AcqRel(CAS 循环)
fetch_mul(factor)原子乘法,返回旧值AcqRel(CAS 循环)
fetch_div(divisor)原子除法,返回旧值AcqRel(CAS 循环)
fetch_update(f)函数式更新,返回旧值AcqRel/Acquire
update_and_get(f)函数式更新,返回新值AcqRel/Acquire
try_update(f)条件函数式更新,返回 Option<旧值>AcqRel/Acquire
try_update_and_get(f)条件函数式更新,返回 Option<新值>AcqRel/Acquire

浮点 CAS 操作(compare_setcompare_and_exchange 及其 weak 版本)比较的是 原始 to_bits() 位模式,而不是 PartialEq。例如 0.0-0.0 虽然相等, 但 CAS 不会匹配;NaN 的 payload 位也必须完全一致。需要明确成功结果时, 优先使用 compare_set,或自行比较 to_bits()

内存序策略

操作类型默认内存序原因
纯读操作 (load())Acquire保证读取最新值
纯写操作 (store())Release保证写入可见
读-改-写操作 (swap()、CAS)AcqRel同时保证读和写的正确性
Atomic<T> 计数加减操作 (fetch_inc()fetch_dec()fetch_add()fetch_sub())Relaxed纯指标场景,无需同步其他数据
显式内存序整数计数加减 (fetch_*_with_ordering)调用方指定计数值作为状态信号时需要显式同步语义
基于 CAS 的算术和更新操作 (fetch_mul()fetch_div()fetch_update()update_and_get()try_update()try_update_and_get()fetch_accumulate()accumulate_and_get())AcqRel / AcquireCAS 循环标准语义
AtomicCount / AtomicSignedCount (inc()dec())CAS 循环值作为并发状态信号
位运算操作 (fetch_and()fetch_or())AcqRel通常用于标志位同步
最大/最小值操作 (fetch_max()fetch_min())AcqRel常与阈值判断配合使用

高级用法:显式内存序与底层访问

当整数计数加减需要同步语义时,优先使用 ordered 变体,再考虑访问 inner()

use std::sync::atomic::Ordering;
use qubit_atomic::Atomic;

let atomic = Atomic::<i32>::new(0);

// 99% 的场景:使用简单 API
let value = atomic.load();

// 状态信号计数:保留 wrapper,同时显式选择内存序
atomic.fetch_add_with_ordering(1, Ordering::AcqRel);

// 最底层逃生口:直接访问后端原子类型
let value = atomic.inner().load(Ordering::Relaxed);
atomic.inner().store(42, Ordering::Release);

与 JDK 对比

特性JDKQubit Atomic说明
基础类型3 种类型Atomic<T> 特例;atomic::primitive::* 用于 const 初始化Rust 覆盖更多整数、浮点、布尔与计数场景
内存序隐式(volatile 语义)默认值 + 显式内存序整数 RMW + inner() 可选Rust 更灵活
弱 CASweakCompareAndSet基础值类型 Atomic<T>compare_set_weak等价
引用类型AtomicReference<V>AtomicRef<T>Rust 使用 Arc<T>
AtomicCount / AtomicSignedCount手动组合AtomicCountAtomicSignedCount状态跟踪用的非负 / 有符号计数
共享所有权通常使用对象引用ArcAtomic<T>ArcAtomicRef<T>ArcAtomicCountArcAtomicSignedCount共享原子容器的便利封装
可空性允许 null使用 Option<Arc<T>>Rust 不允许空指针
位运算部分支持完整支持Rust 更强大
最大/最小值Java 9+ 支持支持等价
API 数量约 20 个方法/类型约 29 个方法/类型Rust 提供更多便利方法

性能考虑

零成本抽象

基础类型封装使用 #[repr(transparent)]#[inline],让泛型 API 编译到底层原子操作:

use qubit_atomic::Atomic;
use std::sync::atomic::Ordering;

// 我们的封装
let atomic = Atomic::<i32>::new(0);
let value = atomic.load();

// 编译后与以下代码生成相同的机器码
let atomic = std::sync::atomic::AtomicI32::new(0);
let value = atomic.load(Ordering::Acquire);

何时使用 inner()

99% 的场景:使用默认 API,已经提供最优性能。

1% 的场景:只有在以下情况才使用 inner()

黄金法则:默认 API 优先,inner() 是最后的手段。

测试与代码覆盖率

本项目保持全面的测试覆盖,对所有功能进行详细验证。

运行测试

# 运行所有测试
cargo test

# 运行基准测试
cargo bench --bench atomic_bench

# 列出基准测试场景
cargo bench --bench atomic_bench -- --list

# 运行覆盖率报告
./coverage.sh

# 生成文本格式报告
./coverage.sh text

# 运行 CI 检查(格式化、clippy、测试、覆盖率)
./ci-check.sh

覆盖率指标

详细的覆盖率统计请参见 COVERAGE.zh_CN.md

依赖项

运行时依赖保持很少:

许可证

Copyright (c) 2025 - 2026. Haixing Hu, Qubit Co. Ltd. All rights reserved.

根据 Apache 许可证 2.0 版("许可证")授权; 除非遵守许可证,否则您不得使用此文件。 您可以在以下位置获取许可证副本:

http://www.apache.org/licenses/LICENSE-2.0

除非适用法律要求或书面同意,否则根据许可证分发的软件 按"原样"分发,不附带任何明示或暗示的担保或条件。 有关许可证下的特定语言管理权限和限制,请参阅许可证。

完整的许可证文本请参阅 LICENSE

贡献

欢迎贡献!请随时提交 Pull Request。

开发指南

作者

胡海星 - Qubit Co. Ltd.

相关项目

Qubit 旗下的更多 Rust 库发布在 GitHub 组织 qubit-ltd

---

仓库地址:https://github.com/qubit-ltd/rs-atomic