qubit-executor

Executor abstractions, task handles, and basic executor implementations for Qubit Rust libraries

Rust CI Coverage Crates.io Rust License

面向 Rust 的 executor 抽象与任务结果原语。

概览

Qubit Executor 提供 Qubit Rust 并发 crate 共享的最小执行 API。它把轻量级执行策略与托管 executor service 分开,并提供可复用的任务 handle,用于发布任务成功、任务失败、panic、取消或完成端点被丢弃等状态。

本 crate 刻意不依赖 Tokio、Rayon 或具体线程池。依赖运行时的实现放在更小的配套 crate 中,便于库作者只依赖自己需要的抽象层。

功能

Executor 与 ExecutorService

Executor 是底层执行策略。它回答的是:“这个单个任务应该如何运行?”已接受任务的结果统一通过 TrackedTask 暴露,即使具体 executor 是同线程内联执行。

ExecutorService 是托管服务。它回答的是:“这个服务是否能接受任务、跟踪任务、关闭并最终终止?”submit 成功只表示服务接受了任务,不表示任务已经开始或成功完成。

ScheduledExecutorServiceExecutorService 的定时提交扩展。它保留普通服务生命周期语义,同时增加 scheduleschedule_callableschedule_atschedule_callable_at。基础实现 SingleThreadScheduledExecutorService 拥有一个调度线程,并在该线程上执行到期任务,因此定时任务体应保持短小,较重工作应转交给其他 executor service。

ExecutorService 生命周期

所有托管服务都遵循相同的高层生命周期:

状态含义
Running服务接受新任务,并且可能已有已接受的任务正在排队、等待调度或运行。
ShuttingDown已调用 shutdown() 请求有序关闭。新提交会被拒绝,已接受的工作会被允许正常完成。
Stopping已调用 stop() 请求立即停止。新提交会被拒绝,服务会尝试取消或 abort 仍可停止的已接受工作。
Terminated已请求 shutdown 或 stop,且没有任何已接受工作仍处于活动状态。

shutdown()stop() 都是终止性的接收开关:调用任一方法后,服务都不再处于 running 状态,也不会再次接受新任务。两者的区别在于如何处理已经接受的工作。

shutdown() 是优雅关闭。它保留已接受的工作,并允许排队中、等待调度中或运行中的任务按照具体服务的正常执行规则完成。

stop() 是立即停止,并且是尽力而为。它会请求取消排队中、等待调度中或尚未开始的工作;如果运行时支持 abort,也可以 abort 由运行时管理的任务。它不能强行中断任意 Rust 代码、blocking 调用或已经运行的 OS 线程,因此服务终止仍可能等待这些工作自行返回。返回的 StopReport 描述处理 stop 请求时观察到的 queued、running 与 cancelled 工作数量。

wait_termination() 会阻塞当前线程,直到 shutdown 或 stop 已经被请求,并且所有已接受工作都已经完成、失败、panic、被取消或按照具体服务能力被 abort。如果在服务仍处于 Running 状态时调用,它会一直等待另一个线程请求 shutdown 或 stop;如果这件事永远不发生,该调用可能永久阻塞。这个 API 明确是同步阻塞式等待,不是 async 或非阻塞等待。

资源释放

不要依赖丢弃 ExecutorService handle 来及时释放服务资源。具体服务可以在 Drop 中请求 shutdown,但析构逻辑不应被假定会等待 worker 线程、辅助线程、运行时任务、队列或任务持有的资源全部结束。若在 Drop 中阻塞,普通的 handle 析构就可能意外等待任意用户代码、blocking 调用或无法被强制中断的 OS 线程任务。

当调用方需要确定性的资源释放时,应使用显式生命周期流程:

  1. 调用 shutdown() 拒绝新任务并排空已接受工作,或调用 stop() 对仍可停止的工作发起尽力取消或 abort。
  2. 调用 wait_termination(),并在它返回后再认为服务已经静止。
  3. 等待返回后,再丢弃 service handle 和相关任务 handle。

已经运行的 blocking 或 OS 线程任务体可能继续持有文件描述符、socket、锁、引用计数对象或其他外部资源,直到任务体自行返回。需要更强清理保证的服务,应提供显式 close/join API,而不是依赖析构副作用。

任务结果

TaskHandle 表示已接受 callable 任务的结果。它支持通过 get 阻塞等待、按值 await、通过 try_get 非阻塞尝试获取结果,以及通过 is_done 检查完成状态。

TrackedTask 在结果获取之外增加状态检查和任务开始前的尽力取消。托管服务通过 submit_trackedsubmit_tracked_callable 返回 tracked handle。

任务执行错误由 TaskExecutionError 表示:

快速开始

直接执行

use std::io;

use qubit_executor::{DirectExecutor, Executor};

let executor = DirectExecutor::new();
let handle = executor.call(|| Ok::<usize, io::Error>(40 + 2))?;
let value = handle.get()?;
assert_eq!(value, 42);
# Ok::<(), Box<dyn std::error::Error>>(())

每个任务一个线程

use std::io;

use qubit_executor::{Executor, ThreadPerTaskExecutor};

let executor = ThreadPerTaskExecutor::new();
let handle = executor.call(|| Ok::<usize, io::Error>(40 + 2))?;
assert_eq!(handle.get()?, 42);
# Ok::<(), Box<dyn std::error::Error>>(())

托管服务

use std::io;

use qubit_executor::{ExecutorService, ThreadPerTaskExecutorService};

let service = ThreadPerTaskExecutorService::new();
let handle = service.submit_callable(|| Ok::<usize, io::Error>(40 + 2))?;
assert_eq!(handle.get()?, 42);
service.shutdown();
service.wait_termination();
# Ok::<(), Box<dyn std::error::Error>>(())

定时托管服务

use std::io;
use std::time::Duration;

use qubit_executor::{
    ExecutorService,
    ScheduledExecutorService,
    SingleThreadScheduledExecutorService,
};

let service = SingleThreadScheduledExecutorService::new("app-scheduler")?;
let handle = service.schedule_callable(Duration::from_millis(25), || {
    Ok::<usize, io::Error>(40 + 2)
})?;
assert_eq!(handle.get()?, 42);
service.shutdown();
service.wait_termination();
# Ok::<(), Box<dyn std::error::Error>>(())

Crate 边界

当你在定义 API,且希望接受或返回 executor 抽象而不绑定具体运行时时,使用 qubit-executor。需要具体实现时,使用对应的运行时 crate:

测试

快速在本地跑一遍:

cargo test
cargo clippy --all-targets --all-features -- -D warnings

若要与持续集成(CI)保持一致,请在仓库根目录依次执行:./align-ci.sh 将本地工具链与配置对齐到 CI 规则,再执行 ./ci-check.sh 复现流水线中的检查。需要查看或生成测试覆盖率时,使用 ./coverage.sh

参与贡献

欢迎通过 Issue 与 Pull Request 参与本仓库。建议:

向本仓库贡献内容即表示您同意以 Apache License, Version 2.0(与本项目相同)授权您的贡献。

许可证与版权

Copyright (c) 2026. Haixing Hu.

本软件依据 Apache License, Version 2.0 授权;完整许可文本见仓库根目录的 LICENSE 文件。

作者与维护

Haixing Hu — Qubit Co. Ltd.

源码仓库github.com/qubit-ltd/rs-executor
API 文档docs.rs/qubit-executor
Crate 发布crates.io/crates/qubit-executor