qubit-batch

One-shot batch execution and processing with sequential and scoped parallel utilities

Rust CI Coverage Crates.io Rust License

面向 Qubit Rust 库的一次性批量执行与批量处理工具 crate。

功能定位

当你已经有一个有限批次,并希望执行一次后获得一致的统计、失败定位和部分结果时, 可以使用 qubit-batch

这个 crate 不是队列、调度器、工作线程池或重试框架。它只消费调用者提供的 迭代器一次,并返回结构化结果。

核心模型

基于 Rayon 的批量执行器位于配套的 qubit-rayon-batch crate。

安装

[dependencies]
qubit-batch = "0.8"

当你要直接实现 RunnableCallableConsumer 类型时,需要额外依赖 qubit-function。当你要实现自定义进度上报器时,需要额外依赖 qubit-progress

示例

校验每个数据项

use qubit_batch::{
    BatchExecutor,
    BatchTaskError,
    SequentialBatchExecutor,
};

#[derive(Debug, Clone, PartialEq, Eq)]
struct ImportError {
    record_id: u64,
    reason: &'static str,
}

let executor = SequentialBatchExecutor::new();

let records = [
    (101, "alice@example.com"),
    (102, "not-an-email"),
    (103, "carol@example.com"),
];

let result = executor
    .for_each(records, |(record_id, email)| {
        if email.contains('@') {
            Ok(())
        } else {
            Err(ImportError {
                record_id,
                reason: "email address is invalid",
            })
        }
    })
    .expect("array length should be exact");

assert_eq!(result.task_count(), 3);
assert_eq!(result.succeeded_count(), 2);
assert_eq!(result.failed_count(), 1);

let failure = &result.failures()[0];
assert_eq!(failure.index(), 1);
match failure.error() {
    BatchTaskError::Failed(error) => {
        assert_eq!(error.record_id, 102);
        assert_eq!(error.reason, "email address is invalid");
    }
    BatchTaskError::Panicked { .. } => unreachable!("the closure returned an error"),
}

并行执行

use qubit_batch::{
    BatchExecutor,
    ParallelBatchExecutor,
};

let executor = ParallelBatchExecutor::builder()
    .thread_count(4)
    .sequential_threshold(0)
    .build()
    .expect("parallel executor configuration should be valid");

let result = executor
    .for_each(0..8, |value| {
        assert!(value < 8);
        Ok::<(), &'static str>(())
    })
    .expect("range length should be exact");

assert!(result.is_success());

ParallelBatchExecutor::default() 会把声明任务数不超过 100 的批次交给顺序执行器, 以避免 scoped 线程创建成本。需要所有非空批次都走并行 worker 时,可设置 sequential_threshold(0)

收集 callable 返回值

use qubit_batch::{
    BatchExecutor,
    SequentialBatchExecutor,
};

fn count_users() -> Result<usize, &'static str> {
    Ok(3)
}
fn count_orders() -> Result<usize, &'static str> {
    Ok(5)
}

let result = SequentialBatchExecutor::new()
    .call([count_users, count_orders])
    .expect("array length should be exact");

assert!(result.outcome().is_success());
assert_eq!(result.values(), &[Some(3), Some(5)]);

直接处理数据项

use qubit_batch::{
    BatchProcessor,
    SequentialBatchProcessor,
};

let mut processor = SequentialBatchProcessor::new(|item: &i32| {
    assert!(*item > 0);
});

let result = processor
    .process([1, 2, 3])
    .expect("array length should be exact");

assert_eq!(result.completed_count(), 3);
assert_eq!(result.processed_count(), 3);

委托固定大小 chunk

use std::{
    num::NonZeroUsize,
    time::Duration,
};

use qubit_batch::{
    BatchProcessResult,
    BatchProcessResultBuilder,
    BatchProcessor,
    ChunkedBatchProcessor,
};

struct InsertChunk;

impl BatchProcessor<i32> for InsertChunk {
    type Error = &'static str;

    fn process_with_count<I>(
        &mut self,
        rows: I,
        count: usize,
    ) -> Result<BatchProcessResult, Self::Error>
    where
        I: IntoIterator<Item = i32>,
    {
        let processed = rows.into_iter().count();
        BatchProcessResultBuilder::builder(count)
            .completed_count(processed)
            .processed_count(processed)
            .chunk_count(1)
            .elapsed(Duration::ZERO)
            .build()
            .map_err(|_| "invalid process result")
    }
}

let mut processor = ChunkedBatchProcessor::new(
    InsertChunk,
    NonZeroUsize::new(2).expect("chunk size is non-zero"),
);

let result = processor
    .process([1, 2, 3, 4, 5])
    .expect("array length should be exact");

assert_eq!(result.completed_count(), 5);
assert_eq!(result.processed_count(), 5);
assert_eq!(result.chunk_count(), 3);

ChunkedBatchProcessor 委托一个 chunk 时,会把 delegate 返回的结果视为这个 已提交 chunk 的结果。返回 Ok 表示 delegate 已经让 chunk 内每个数据项都达到 终态,因此 item_countcompleted_count 必须都等于提交的 chunk 长度。 processed_count 可以小于 chunk 长度,用于表达目标系统报告的成功数更少,例如 幂等数据库插入接受了 3 行但实际只影响 2 行。如果 delegate 无法让整个 chunk 达到终态,应返回 Err;不一致的 Ok 结果会被报告为 ChunkedBatchProcessError::InvalidChunkResult

进度上报

qubit-batch 接受 qubit-progress 的上报器,但不重新导出 qubit-progress 中的类型。自定义上报器应直接实现 qubit-progress 的 trait。 SequentialBatchExecutorParallelBatchExecutorSequentialBatchProcessorParallelBatchProcessorChunkedBatchProcessor 都可以挂接自定义上报器。

配置统一走 builder API。自定义上报器、上报间隔、worker 数、阈值或 chunked processor 选项时,使用 SequentialBatchExecutor::builder()ParallelBatchExecutor::builder()SequentialBatchProcessor::builder(...)ParallelBatchProcessor::builder(...)ChunkedBatchProcessor::builder(...)

use std::time::Duration;

use qubit_batch::{
    BatchExecutor,
    SequentialBatchExecutor,
};
use qubit_progress::{
    ProgressEvent,
    ProgressPhase,
    ProgressReporter,
};

struct ConsoleReporter;

impl ProgressReporter for ConsoleReporter {
    fn report(&self, event: &ProgressEvent) {
        let counters = event.counters();
        let total = counters.total_count().unwrap_or(counters.completed_count());
        match event.phase() {
            ProgressPhase::Started => println!("starting {total} tasks"),
            ProgressPhase::Running => println!(
                "completed {}/{total}, active {}, elapsed {:?}",
                counters.completed_count(),
                counters.active_count(),
                event.elapsed(),
            ),
            ProgressPhase::Finished => println!("finished {total} tasks in {:?}", event.elapsed()),
            ProgressPhase::Failed | ProgressPhase::Canceled => println!(
                "stopped after {}/{total} tasks in {:?}",
                counters.completed_count(),
                event.elapsed(),
            ),
        }
    }
}

let executor = SequentialBatchExecutor::builder()
    .reporter(ConsoleReporter)
    .report_interval(Duration::from_millis(250))
    .build();

let result = executor
    .for_each(["a", "b", "c"], |_item| Ok::<(), &'static str>(()))
    .expect("array length should be exact");

assert!(result.is_success());

任务体中的 panic 会被捕获为 BatchTaskError::Panicked。processor consumer 和 进度上报器本身的 panic 会直接传播给调用者,因为它们不属于任务失败模型。 顺序执行和顺序处理只会在两个任务或数据项之间上报进度;并行变体通过 Progress::spawn_running_reporter 在 scoped 上报线程中周期性发送 running 进度。

配置的 report_interval 是在实现代码到达 running 进度点时检查的节流条件, 不保证时间一到就立刻发出 running 事件。顺序变体在任务或数据项之间检查, chunked processing 在一个 chunk 完成后检查。并行变体使用 scoped 上报线程; 当 interval 大于 0 时,也可以在 worker 活跃期间周期性发送 running 事件。 Duration::ZERO 表示关闭时间节流:每当实现代码到达自己的 running 进度点时 就尽快上报,但不会因此进入持续刷新循环。

任务数量契约

执行和处理 API 在输入迭代器实现 ExactSizeIterator 时会自动取得声明数量。 当数量本身是独立契约时,使用 execute_with_countcall_with_countfor_each_with_countprocess_with_count。这样 API 仍可以在消费惰性迭代器前 获得稳定总数,并在生产者产出数量不正确时返回部分结果。

use qubit_batch::{
    BatchExecutionError,
    BatchExecutor,
    SequentialBatchExecutor,
};

let executor = SequentialBatchExecutor::new();
let error = executor
    .for_each_with_count([10, 20], 3, |_value| Ok::<(), &'static str>(()))
    .expect_err("the iterator yielded fewer items than declared");

match error {
    BatchExecutionError::CountShortfall {
        expected,
        actual,
        outcome,
    } => {
        assert_eq!(expected, 3);
        assert_eq!(actual, 2);
        assert_eq!(outcome.completed_count(), 2);
    }
    BatchExecutionError::CountExceeded { .. } => unreachable!(),
}

需要特别注意结果语义:

API 速览

项目结构

文档

测试与 CI

在 crate 根目录快速执行本地检查:

cargo test
cargo clippy --all-targets -- -D warnings

若要与仓库 CI 环境保持一致,请运行:

./align-ci.sh
./ci-check.sh
./coverage.sh json

./align-ci.sh 会先对齐本地工具链和 CI 相关配置;./ci-check.sh 复现流水线检查。 修改运行期行为并需要关注覆盖率时,可配合使用 ./coverage.sh

参与贡献

欢迎通过 Issue 与 Pull Request 参与本仓库。建议单次变更聚焦一个主题;修改行为时 补充或更新测试;影响公开 API 或用户可见行为时,同步更新本文档或 rustdoc。

向本仓库贡献内容即表示您同意以 Apache License, Version 2.0(与本项目相同) 授权您的贡献。

许可证与版权

Copyright (c) 2026. Haixing Hu.

本软件依据 Apache License, Version 2.0 授权;完整许可文本见仓库根目录的 LICENSE 文件。

作者与维护

Haixing Hu — Qubit Co. Ltd.

源码仓库github.com/qubit-ltd/rs-batch
API 文档docs.rs/qubit-batch
Crate 发布crates.io/crates/qubit-batch