qubit-retry

Retry module, providing a feature-complete, type-safe retry management system with support for multiple delay strategies and event listeners

Rust CI Coverage Crates.io Rust License

Qubit Retry 是面向 Rust 同步和异步操作的重试工具库,能够保留调用方的错误类型。

核心 API 是 Retry<E>。重试策略只绑定操作错误类型 E;每次 runrun_async 调用再引入自己的成功类型 T

概览

Qubit Retry 适用于需要对易失败任务进行明确、可观测重试控制的 Rust 应用。它支持同步操作、基于 Tokio 的异步操作,以及隔离到 worker 线程中的阻塞任务。重试策略可通过 builder 配置,也可以在开启 config feature 后从 qubit-config 读取;生命周期 hook 能观察每次 attempt、失败、重试决策、终止错误和成功结果。

当你需要类型化的 retry error、受限的 elapsed 时间预算、Retry-After hint、能捕获 panic 的 worker 执行,或可由闭包/可复用函数对象实现的重试回调时,可以使用本 crate。

特性

安装

[dependencies]
qubit-retry = "0.15"

按需开启可选集成:

[dependencies]
qubit-retry = { version = "0.15", features = ["tokio", "config"] }

可选 feature:

默认 feature 为空,因此同步重试不会引入 tokioqubit-config

基础同步重试

use qubit_retry::Retry;
use std::time::Duration;

fn read_config() -> Result<String, Box<dyn std::error::Error>> {
    let retry = Retry::<std::io::Error>::builder()
        .max_attempts(3)
        .fixed_delay(Duration::from_millis(100))
        .build()?;

    let text = retry.run(|| std::fs::read_to_string("config.toml"))?;
    Ok(text)
}

失败决策

默认情况下,operation error 会被重试,直到配置的 attempt 次数或总耗时限制终止流程。简单错误谓词可以使用 retry_if_error

use qubit_retry::{Retry, RetryContext};
use std::time::Duration;

let retry = Retry::<ServiceError>::builder()
    .max_attempts(4)
    .exponential_backoff(Duration::from_millis(100), Duration::from_secs(2))
    .retry_if_error(|error: &ServiceError, _context: &RetryContext| error.is_retryable())
    .build()?;

如果决策需要读取 failure 类型、attempt timeout、retry-after hint 或其他 RetryContext 信息,可以使用 on_failure

use qubit_retry::{Retry, RetryContext, AttemptFailure, AttemptFailureDecision};
use std::time::Duration;

let retry = Retry::<ServiceError>::builder()
    .max_attempts(3)
    .fixed_delay(Duration::from_millis(100))
    .on_failure(
        |failure: &AttemptFailure<ServiceError>, context: &RetryContext| match failure {
            AttemptFailure::Error(error) if error.is_rate_limited() => {
                AttemptFailureDecision::RetryAfter(Duration::from_secs(1))
            }
            AttemptFailure::Error(error) if error.is_retryable() => AttemptFailureDecision::Retry,
            AttemptFailure::Timeout if context.attempt_timeout().is_some() => {
                AttemptFailureDecision::Abort
            }
            AttemptFailure::Panic(_) => AttemptFailureDecision::Abort,
            AttemptFailure::Executor(_) => AttemptFailureDecision::Abort,
            _ => AttemptFailureDecision::UseDefault,
        },
    )
    .build()?;

AttemptFailureDecision::UseDefault 表示把控制权交回重试策略,由已配置的次数限制、耗时限制、delay、jitter 和可选 retry-after hint 决定下一步。

异步重试和超时

异步执行需要开启 tokio feature。单次 attempt 超时通过 builder 写入 RetryOptions。当 attempt 超时时,执行器会报告 AttemptFailure::Timeout,监听器可以通过 RetryContext::attempt_timeout() 读取配置的超时时间。operation panic 仍会在当前 async task 中继续 unwind;run_async() 不会把它转换成 AttemptFailure::Panic

use qubit_retry::Retry;
use std::time::Duration;

async fn fetch_once() -> Result<String, std::io::Error> {
    Ok("response".to_string())
}

async fn fetch_with_retry() -> Result<String, Box<dyn std::error::Error>> {
    let retry = Retry::<std::io::Error>::builder()
        .max_attempts(3)
        .fixed_delay(Duration::from_millis(50))
        .attempt_timeout(Some(Duration::from_secs(2)))
        .retry_on_timeout()
        .build()?;

    let response = retry
        .run_async(|| async {
            fetch_once().await
        })
        .await?;

    Ok(response)
}

普通 run() 保持当前线程上的同步执行语义。它是开销最低的路径,适合 CAS 循环这类高频短操作。run() 不支持配置 attempt_timeout,当设置了该选项时会返回 RetryErrorReason::UnsupportedOperation。需要取消异步 future 时使用 run_async();需要把阻塞工作放到 worker 线程中执行时,使用 run_in_worker()

Elapsed 预算

Retry 的 elapsed 预算使用单调 Instant 计时,不使用 wall-clock 时间:

终态 listener 保持通知语义。on_successon_error 的耗时会增加调用方实际等待时间,但不会把已经成功的 operation 反向变成 retry failure。

async 和 worker-thread attempt 会从配置的 attempt_timeout、剩余 max_operation_elapsed、剩余 max_total_elapsed 中选最短值作为有效 attempt timeout。如果下一次 retry 或 Retry-After 延迟会耗尽剩余 max_total_elapsed,流程会在 sleep 前以 RetryErrorReason::MaxTotalElapsedExceeded 失败。retry sleep 不会被截断。

Worker 线程重试

run_in_worker() 会把每次 attempt 都放到 worker 线程中运行。没有配置 attempt timeout 时,调用方等待 worker 返回,并把 worker panic 捕获为 AttemptFailure::Panic。worker 线程启动失败会报告为 AttemptFailure::Executor。配置了 attempt timeout 时,retry executor 会在超时后停止等待该 worker,标记本次 attempt 的 token 为 cancelled,并最多等待 worker_cancel_grace(默认 100ms)让 worker 退出,然后再按配置的 AttemptTimeoutPolicy 继续处理。

Rust 不能安全地强杀运行中的线程,因此如果 operation 不检查 token 并主动返回,超时后的 worker 可能会继续运行。如果 worker 在取消 grace 结束后仍未退出,retry flow 会返回 RetryErrorReason::WorkerStillRunning,不会再启动新的 worker;RetryContext::unreaped_worker_count() 会记录未回收 worker 数量。阻塞 IO、第三方调用、可能 panic 的代码,或需要单次 attempt 超时隔离的任务适合使用这一路径;低延迟内存操作优先使用普通 run()

use qubit_retry::{AttemptCancelToken, Retry};
use std::time::Duration;

fn blocking_fetch(token: AttemptCancelToken) -> Result<String, std::io::Error> {
    for _ in 0..20 {
        if token.is_cancelled() {
            return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "cancelled"));
        }
        std::thread::sleep(Duration::from_millis(10));
    }
    std::fs::read_to_string("payload.txt")
}

let retry = Retry::<std::io::Error>::builder()
    .max_attempts(3)
    .fixed_delay(Duration::from_millis(50))
    .attempt_timeout(Some(Duration::from_secs(2)))
    .worker_cancel_grace(Duration::from_millis(25))
    .abort_on_timeout()
    .build()?;

let response = retry.run_in_worker(blocking_fetch)?;

Retry-After Hint

如果 attempt failure 中携带 retry-after 信息,可以通过 retry_after_hint 注册 hint extractor。extractor 的返回值是 Option<Duration>Some(delay) 表示“下一次重试前等待这段时间”,None 表示“没有可用 hint”。当所有 failure listener 都返回 UseDefault 时,默认策略会优先使用 Some(delay);否则会回退到已配置的 delay 策略。

use qubit_retry::{AttemptFailure, Retry, RetryContext};
use std::time::Duration;

let retry = Retry::<ServiceError>::builder()
    .max_attempts(3)
    .fixed_delay(Duration::from_millis(100))
    .retry_after_hint(
        |failure: &AttemptFailure<ServiceError>, _context: &RetryContext| {
            failure.as_error().and_then(ServiceError::retry_after)
        },
    )
    .build()?;

如果 hint 只依赖 operation error,可以使用 retry_after_from_error,这是 retry_after_hint 的简化封装:

let retry = Retry::<ServiceError>::builder()
    .max_attempts(3)
    .fixed_delay(Duration::from_millis(100))
    .retry_after_from_error(|error: &ServiceError| error.retry_after())
    .build()?;

listener 也可以通过 RetryContext::retry_after_hint() 读取提取结果。

监听器

listener 是生命周期 hook,而不是另一套策略系统:

注册多个 failure listener 时,所有 listener 会按注册顺序执行。最后一个非 UseDefaultAttemptFailureDecision 会成为最终生效决策;如果所有 listener 都返回 UseDefault,则交回已配置的 retry 策略处理。

before_attempton_retry 的直观差别:before_attempt 对准「下一次 attempt 开始前」;on_retry 对准「某次 attempt 已经失败、且已经后续重试选好间隔、但尚未开始等待或下一轮 attempt 的那一刻」。

use qubit_retry::{
    AttemptFailure, AttemptFailureDecision, Retry, RetryContext, RetryError,
};

let retry = Retry::<std::io::Error>::builder()
    .max_attempts(3)
    .before_attempt(|context: &RetryContext| {
        tracing::debug!(attempt = context.attempt(), "starting attempt");
    })
    .on_success(|context: &RetryContext| {
        tracing::debug!(attempt = context.attempt(), "attempt succeeded");
    })
    .on_failure(
        |failure: &AttemptFailure<std::io::Error>, context: &RetryContext| {
            tracing::warn!(
                failure = %failure,
                attempt = context.attempt(),
                retry_after_hint = ?context.retry_after_hint(),
                "attempt failed",
            );
            AttemptFailureDecision::UseDefault
        },
    )
    .on_retry(
        |failure: &AttemptFailure<std::io::Error>, context: &RetryContext| {
            tracing::info!(
                failure = %failure,
                attempt = context.attempt(),
                next_delay = ?context.next_delay(),
                "will sleep before next attempt",
            );
        },
    )
    .on_error(|error: &RetryError<std::io::Error>, context: &RetryContext| {
        tracing::error!(
            reason = ?error.reason(),
            attempts = context.attempt(),
            operation_elapsed_ms = context.operation_elapsed().as_millis(),
            total_elapsed_ms = context.total_elapsed().as_millis(),
            "retry flow failed",
        );
    })
    .build()?;

配置

RetryOptions 是不可变配置快照。从 qubit-config 读取配置需要开启 config feature,并且只发生在构造阶段。

use qubit_config::Config;
use qubit_retry::{Retry, RetryOptions};

let mut config = Config::new();
config.set("retry.max_attempts", 5u32)?;
config.set("retry.max_operation_elapsed_millis", 30_000u64)?;
config.set("retry.max_total_elapsed_millis", 60_000u64)?;
config.set("retry.delay", "exponential")?;
config.set("retry.exponential_initial_delay_millis", 200u64)?;
config.set("retry.exponential_max_delay_millis", 5_000u64)?;
config.set("retry.exponential_multiplier", 2.0)?;
config.set("retry.jitter_factor", 0.2)?;
config.set("retry.attempt_timeout_millis", 2_000u64)?;
config.set("retry.attempt_timeout_policy", "retry")?;
config.set("retry.worker_cancel_grace_millis", 25u64)?;

let options = RetryOptions::from_config(&config.prefix_view("retry"))?;
let retry = Retry::<std::io::Error>::from_options(options)?;

支持的相对配置键:

错误处理

通过 RetryError::reason()RetryError::last_failure()RetryError::context() 可以区分终止原因与最后一次 attempt 失败:

use qubit_retry::{Retry, RetryErrorReason, AttemptFailure};

let retry = Retry::<std::io::Error>::builder()
    .max_attempts(2)
    .build()?;

match retry.run(|| std::fs::read_to_string("missing.toml")) {
    Ok(text) => println!("{text}"),
    Err(error) => {
        eprintln!("reason: {:?}", error.reason());
        eprintln!("attempts: {}", error.context().attempt());
        eprintln!("operation elapsed: {:?}", error.context().operation_elapsed());
        eprintln!("total elapsed: {:?}", error.context().total_elapsed());

        match error.last_failure() {
            Some(AttemptFailure::Error(source)) => {
                eprintln!("last operation error: {source}");
            }
            Some(AttemptFailure::Timeout) => {
                eprintln!("last attempt timed out");
            }
            Some(AttemptFailure::Panic(panic)) => {
                eprintln!("last attempt panicked: {}", panic.message());
            }
            Some(AttemptFailure::Executor(executor)) => {
                eprintln!("retry executor failed: {}", executor.message());
            }
            None => {}
        }
    }
}

文档

测试

快速在本地跑一遍:

cargo test --all-features
cargo clippy --all-targets --all-features -- -D warnings

若要与持续集成(CI)保持一致,请在项目根目录执行:

./align-ci.sh
./ci-check.sh
./coverage.sh

./align-ci.sh 会格式化代码并执行本地 Clippy 修复,使分支与 CI 规则对齐。./ci-check.sh 会运行与流水线等价的完整检查,包括格式检查、Clippy warnings deny、debug/release 构建、all-feature 测试、rustdoc warnings deny、JSON 覆盖率阈值检查以及安全审计。./coverage.sh 用于生成覆盖率报告;可通过 ./coverage.sh help 查看 HTML、text、LCOV、JSON、Cobertura 或 all 等输出格式。

参与贡献

欢迎通过 Issue 与 Pull Request 参与本仓库。建议:

向本仓库贡献内容即表示您同意以 Apache License, Version 2.0(与本项目相同)授权您的贡献。

许可证与版权

Copyright (c) 2026. Haixing Hu.

本软件依据 Apache License, Version 2.0 授权;完整许可文本见仓库根目录的 LICENSE 文件。

作者与维护

Haixing Hu — Qubit Co. Ltd.

源码仓库github.com/qubit-ltd/rs-retry
API 文档docs.rs/qubit-retry
Crate 发布crates.io/crates/qubit-retry