Async Rust: From Futures to Production
Async Rust:从 Future 到生产环境
Speaker Intro
讲者简介
- Principal Firmware Architect in Microsoft SCHIE (Silicon and Cloud Hardware Infrastructure Engineering) team
微软 SCHIE 团队首席固件架构师,SCHIE 即 Silicon and Cloud Hardware Infrastructure Engineering。 - Industry veteran with expertise in security, systems programming (firmware, operating systems, hypervisors), CPU and platform architecture, and C++ systems
长期从事安全、系统编程、固件、操作系统、虚拟机监控器、CPU 与平台架构,以及 C++ 系统开发。 - Started programming in Rust in 2017 (@AWS EC2), and have been in love with the language ever since
自 2017 年在 AWS EC2 开始使用 Rust,此后一直深度投入这门语言。
A deep-dive guide to asynchronous programming in Rust. Unlike most async tutorials that start with tokio::main and hand-wave the internals, this guide builds understanding from first principles — the Future trait, polling, state machines — then progresses to real-world patterns, runtime selection, and production pitfalls.
这是一本深入讲解 Rust 异步编程的训练指南。和许多从 tokio::main 起步、对底层一笔带过的教程不同,这本书从第一性原理讲起,先拆清 Future trait、轮询机制和状态机,再逐步推进到真实世界中的模式选择、运行时决策与生产环境问题。
Who This Is For
适合哪些读者
- Rust developers who can write synchronous Rust but find async confusing
已经能够编写同步 Rust,但一碰 async 就开始发懵的 Rust 开发者。 - Developers from C#, Go, Python, or JavaScript who know
async/awaitbut not Rust’s model
来自 C#、Go、Python 或 JavaScript 生态,熟悉async/await,但不了解 Rust 异步模型的开发者。 - Anyone who’s been bitten by
Future is not Send,Pin<Box<dyn Future>>, or “why does my program hang?”
凡是被Future is not Send、Pin<Box<dyn Future>>,或者“程序为什么挂住了”这类问题折腾过的人,都适合读这一套。
Prerequisites
前置知识
You should be comfortable with:
开始阅读前,最好已经具备以下基础:
- Ownership, borrowing, and lifetimes
所有权、借用与生命周期。 - Traits and generics (including
impl Trait)
Trait 与泛型,包括impl Trait。 - Using
Result<T, E>and the?operatorResult<T, E>的使用方式,以及?运算符。 - Basic multi-threading (
std::thread::spawn,Arc,Mutex)
基础多线程知识,例如std::thread::spawn、Arc、Mutex。
No prior async Rust experience is needed.
不要求事先有 Rust 异步编程经验。
How to Use This Book
如何使用本书
Read linearly the first time. Parts I–III build on each other. Each chapter has:
第一次阅读建议按顺序来。 第一到第三部分是逐层递进的,每一章都承担了后面章节的铺垫。
| Symbol | Meaning |
|---|---|
| 🟢 🟢 | Beginner — foundational concept 初级:基础概念,偏入门。 |
| 🟡 🟡 | Intermediate — requires earlier chapters 中级:需要前面章节的基础。 |
| 🔴 🔴 | Advanced — deep internals or production patterns 高级:涉及底层机制或生产模式。 |
Each chapter includes:
每一章通常都包含以下组成部分:
- A “What you’ll learn” block at the top
开头的 “What you’ll learn” 学习目标。 - Mermaid diagrams for visual learners
便于理解流程和结构的 Mermaid 图示。 - An inline exercise with a hidden solution
带隐藏答案的 嵌入式练习。 - Key Takeaways summarizing the core ideas
用于收束重点的 Key Takeaways。 - Cross-references to related chapters
指向相关章节的 交叉引用。
Pacing Guide
学习节奏建议
| Chapters | Topic | Suggested Time | Checkpoint |
|---|---|---|---|
| 1–5 1–5 章 | How Async Works Async 如何工作 | 6–8 hours 6–8 小时 | You can explain Future, Poll, Pin, and why Rust has no built-in runtime能够解释 Future、Poll、Pin,以及为什么 Rust 没有内置运行时。 |
| 6–10 6–10 章 | The Ecosystem 生态体系 | 6–8 hours 6–8 小时 | You can build futures by hand, choose a runtime, and use tokio’s API 能够手写 future、选择运行时,并熟练使用 tokio API。 |
| 11–13 11–13 章 | Production Async 生产环境中的 Async | 6–8 hours 6–8 小时 | You can write production-grade async code with streams, proper error handling, and graceful shutdown 能够写出具备 stream、正确错误处理和优雅停机能力的生产级异步代码。 |
| Capstone 综合项目 | Chat Server 聊天服务器 | 4–6 hours 4–6 小时 | You’ve built a real async application integrating all concepts 已经完成一个整合全部概念的真实异步应用。 |
Total estimated time: 22–30 hours
预计总学习时间:22–30 小时。
Working Through Exercises
练习建议
Every content chapter has an inline exercise. The capstone (Ch 16) integrates everything into a single project. For maximum learning:
每个正文章节都带有嵌入式练习,第 16 章的综合项目则会把全部内容整合到一个完整项目中。为了把收益吃满,建议按下面的节奏来:
- Try the exercise before expanding the solution — struggling is where learning happens
先做题,再看答案。 真正的理解通常发生在卡住和挣扎的时候。 - Type the code, don’t copy-paste — muscle memory matters for Rust’s syntax
手敲代码,不要复制粘贴。 Rust 语法特别依赖肌肉记忆。 - Run every example —
cargo new async-exercisesand test as you go
每个示例都跑一遍。 可以单独建一个cargo new async-exercises工程,边学边验证。
Table of Contents
目录总览
Part I: How Async Works
第一部分:Async 如何工作
- 1. Why Async is Different in Rust 🟢 — The fundamental difference: Rust has no built-in runtime
1. 为什么 Rust 中的 Async 与众不同 🟢 —— 核心差异是:Rust 没有内置运行时。 - 2. The Future Trait 🟡 —
poll(),Waker, and the contract that makes it all work
2. Future Trait 🟡 —— 讲清poll()、Waker以及整套机制依赖的契约。 - 3. How Poll Works 🟡 — The polling state machine and a minimal executor
3. Poll 如何工作 🟡 —— 轮询状态机和一个最小执行器。 - 4. Pin and Unpin 🔴 — Why self-referential structs need pinning
4. Pin 与 Unpin 🔴 —— 为什么自引用结构体需要 pinning。 - 5. The State Machine Reveal 🟢 — What the compiler actually generates from
async fn
5. 状态机真相 🟢 —— 编译器到底从async fn生成了什么。
Part II: The Ecosystem
第二部分:生态体系
- 6. Building Futures by Hand 🟡 — TimerFuture, Join, Select from scratch
6. 手写 Future 🟡 —— 从零实现 TimerFuture、Join、Select。 - 7. Executors and Runtimes 🟡 — tokio, smol, async-std, embassy — how to choose
7. 执行器与运行时 🟡 —— tokio、smol、async-std、embassy 该怎么选。 - 8. Tokio Deep Dive 🟡 — Runtime flavors, spawn, channels, sync primitives
8. Tokio 深入解析 🟡 —— 运行时类型、spawn、channel 与同步原语。 - 9. When Tokio Isn’t the Right Fit 🟡 — LocalSet, FuturesUnordered, runtime-agnostic design
9. Tokio 不合适的场景 🟡 —— LocalSet、FuturesUnordered 与运行时无关设计。 - 10. Async Traits 🟡 — RPITIT, dyn dispatch, trait_variant, async closures
10. Async Trait 🟡 —— RPITIT、dyn 分发、trait_variant 与 async 闭包。
Part III: Production Async
第三部分:生产环境中的 Async
- 11. Streams and AsyncIterator 🟡 — Async iteration, AsyncRead/Write, stream combinators
11. Stream 与 AsyncIterator 🟡 —— 异步迭代、AsyncRead/Write 与 stream 组合器。 - 12. Common Pitfalls 🔴 — 9 production bugs and how to avoid them
12. 常见陷阱 🔴 —— 9 类生产事故及其规避方法。 - 13. Production Patterns 🔴 — Graceful shutdown, backpressure, Tower middleware
13. 生产模式 🔴 —— 优雅停机、背压与 Tower 中间件。
Appendices
附录
- Summary and Reference Card — Quick-lookup tables and decision trees
总结与参考卡片 —— 便于快速查阅的表格和决策树。 - Capstone Project: Async Chat Server — Build a complete async application
综合项目:Async 聊天服务器 —— 构建一个完整的异步应用。
1. Why Async is Different in Rust 🟢
1. 为什么 Rust 的 async 和别家不一样 🟢
What you’ll learn:
本章将学到什么:
- Why Rust has no built-in async runtime (and what that means for you)
为什么 Rust 没有内建 async 运行时,以及这件事对实际开发意味着什么- The three key properties: lazy execution, no runtime, zero-cost abstraction
三个关键特征:惰性执行、无内建运行时、零成本抽象- When async is the right tool (and when it’s slower)
什么时候该用 async,什么时候反而会更慢- How Rust’s model compares to C#, Go, Python, and JavaScript
Rust 的模型和 C#、Go、Python、JavaScript 相比到底差在哪
The Fundamental Difference
根本差异
Most languages with async/await hide the machinery. C# has the CLR thread pool. JavaScript has the event loop. Go has goroutines and a scheduler built into the runtime. Python has asyncio.
大多数带 async/await 的语言,都会把背后的 machinery 藏起来。C# 有 CLR 线程池,JavaScript 有 event loop,Go 有 goroutine 和内建调度器,Python 有 asyncio。
Rust has nothing.
Rust 默认什么都不送。
There is no built-in runtime, no thread pool, no event loop. The async keyword is a zero-cost compilation strategy — it transforms your function into a state machine that implements the Future trait. Someone else (an executor) must drive that state machine forward.
Rust 没有内建运行时、没有默认线程池、也没有偷偷躲在背后的事件循环。async 关键字本质上是一种零成本编译策略,它会把函数改写成实现了 Future trait 的状态机。真正推动这个状态机往前跑的,必须是外部执行器,也就是 executor。
Three Key Properties of Rust Async
Rust async 的三个核心特征
graph LR
subgraph "Other Languages"
EAGER["Eager Execution<br/>Task starts immediately"]
BUILTIN["Built-in Runtime<br/>Thread pool included"]
GC["GC-Managed<br/>No lifetime concerns"]
end
subgraph "Rust"
LAZY["Lazy Execution<br/>Nothing happens until polled"]
BYOB["Bring Your Own Runtime<br/>You choose the executor"]
OWNED["Ownership Applies<br/>Lifetimes, Send, Sync matter"]
end
EAGER -. "opposite" .-> LAZY
BUILTIN -. "opposite" .-> BYOB
GC -. "opposite" .-> OWNED
style LAZY fill:#e8f5e8,color:#000
style BYOB fill:#e8f5e8,color:#000
style OWNED fill:#e8f5e8,color:#000
style EAGER fill:#e3f2fd,color:#000
style BUILTIN fill:#e3f2fd,color:#000
style GC fill:#e3f2fd,color:#000
这三点是理解 Rust async 的地基:它是惰性的,不自带调度环境,而且所有权、生命周期、Send、Sync 这些规则会原封不动压到 async 世界里。
如果脑子里还带着 “async 一写,任务就自动跑起来了” 这种别家语言的惯性,到了 Rust 这边很容易一头撞墙。
No Built-In Runtime
没有内建运行时
// This compiles but does NOTHING:
async fn fetch_data() -> String {
"hello".to_string()
}
fn main() {
let future = fetch_data(); // Creates the Future, but doesn't execute it
// future is just a struct sitting on the stack
// No output, no side effects, nothing happens
drop(future); // Silently dropped — work was never started
}
这段代码能编译,但什么也不会发生。fetch_data() 被调用时,只是生成了一个 future 对象,它安安静静躺在栈上,等着别人来 poll。如果没人管它,丢掉就结束了。
这点对于从 C# 或 JavaScript 过来的人尤其容易搞混,因为那边一创建 task 或 promise,通常就已经开跑了。
Compare with C# where Task starts eagerly:
对比 C#,Task 是急切执行的:
// C# — this immediately starts executing:
async Task<string> FetchData() => "hello";
var task = FetchData(); // Already running!
var result = await task; // Just waits for completion
Lazy Futures vs Eager Tasks
惰性 Future 与急切 Task
This is the single most important mental shift:
这是最关键的一次思维切换:
| C# / JavaScript / Python | Go | Rust | |
|---|---|---|---|
| Creation 创建时 | Task starts executing immediatelyTask 会立刻开始执行 | Goroutine starts immediately goroutine 立刻启动 | Future does nothing until polledFuture 在被 poll 前什么都不做 |
| Dropping 被丢弃时 | Detached task continues running 脱离引用后往往还会继续跑 | Goroutine runs until return goroutine 会一直跑到返回 | Dropping a Future cancels it future 一旦被丢弃,就等于取消 |
| Runtime 运行时 | Built into the language/VM 语言或 VM 自带 | Built into the binary (M:N scheduler) 二进制里自带调度器 | You choose (tokio, smol, etc.) 由使用者自己选,例如 Tokio、smol |
| Scheduling 调度 | Automatic (thread pool) 自动调度 | Automatic (GMP scheduler) 自动调度 | Explicit (spawn, block_on)显式触发,例如 spawn、block_on |
| Cancellation 取消 | CancellationToken (cooperative)CancellationToken 协作式取消 | context.Context (cooperative)context.Context 协作式取消 | Drop the future (immediate) 直接丢弃 future |
// To actually RUN a future, you need an executor:
#[tokio::main]
async fn main() {
let result = fetch_data().await; // NOW it executes
println!("{result}");
}
所以 Rust async 里最容易踩空的一点,就是把“创建 future”和“执行 future”混为一谈。前者只是在组装工作单,后者才是真正开始干活。
这也是为什么 Rust 很强调 executor、spawn、block_on 这些词。没有它们,future 只是一个静态对象,不会自己动。
When to Use Async (and When Not To)
什么时候该用 async,什么时候别硬上
graph TD
START["What kind of work?"]
IO["I/O-bound?<br/>(network, files, DB)"]
CPU["CPU-bound?<br/>(computation, parsing)"]
MANY["Many concurrent connections?<br/>(100+)"]
FEW["Few concurrent tasks?<br/>(<10)"]
USE_ASYNC["✅ Use async/await"]
USE_THREADS["✅ Use std::thread or rayon"]
USE_SPAWN_BLOCKING["✅ Use spawn_blocking()"]
MAYBE_SYNC["Consider synchronous code<br/>(simpler, less overhead)"]
START -->|Network, files, DB| IO
START -->|Computation| CPU
IO -->|Yes, many| MANY
IO -->|Just a few| FEW
MANY --> USE_ASYNC
FEW --> MAYBE_SYNC
CPU -->|Parallelize| USE_THREADS
CPU -->|Inside async context| USE_SPAWN_BLOCKING
style USE_ASYNC fill:#c8e6c9,color:#000
style USE_THREADS fill:#c8e6c9,color:#000
style USE_SPAWN_BLOCKING fill:#c8e6c9,color:#000
style MAYBE_SYNC fill:#fff3e0,color:#000
Rule of thumb: Async is for I/O concurrency (doing many things at once while waiting), not CPU parallelism (making one thing faster). If you have 10,000 network connections, async shines. If you’re crunching numbers, use rayon or OS threads.
经验法则: async 适合处理 I/O 并发,也就是“一边等网络、文件、数据库,一边把别的事情先做掉”;它并不是拿来给 CPU 密集计算提速的。如果有 1 万个网络连接,async 很亮眼;如果是在死命算数,优先考虑 rayon 或系统线程。
When Async Can Be Slower
什么时候 async 反而更慢
Async isn’t free. For low-concurrency workloads, synchronous code can outperform async:
async 从来不是免费的。并发量不高时,同步代码完全可能比 async 更快:
| Cost 代价 | Why 原因 |
|---|---|
| State machine overhead 状态机开销 | Each .await adds an enum variant; deeply nested futures produce large, complex state machines每个 .await 都会引入新的状态变体,future 嵌套深了,状态机就会变大变复杂 |
| Dynamic dispatch 动态分发 | Box<dyn Future> adds indirection and kills inliningBox<dyn Future> 会带来额外间接层,还会影响内联 |
| Context switching 上下文切换 | Cooperative scheduling still has cost — the executor must manage a task queue, wakers, and I/O registrations 协作式调度照样有管理成本,执行器要维护任务队列、waker 和 I/O 注册 |
| Compile time 编译时间 | Async code generates more complex types, slowing down compilation async 代码会生成更复杂的类型,编译速度也会跟着受影响 |
| Debuggability 调试可读性 | Stack traces through state machines are harder to read (see Ch. 12) 状态机展开后的调用栈更难看懂,调试体验通常更拧巴 |
Benchmarking guidance: If fewer than ~10 concurrent I/O operations, profile before committing to async. A simple std::thread::spawn per connection scales fine to hundreds of threads on modern Linux.
基准建议: 如果并发 I/O 数量不到十来个,先测再说,别急着全盘 async 化。现代 Linux 上,一连接一线程在几百线程级别都未必是问题。
Exercise: When Would You Use Async?
练习:什么时候会选 async?
🏋️ Exercise 🏋️ 练习
For each scenario, decide whether async is appropriate and explain why:
针对下面几个场景,判断 async 是否合适,并说明原因:
- A web server handling 10,000 concurrent WebSocket connections
1. 一个要同时处理 1 万个 WebSocket 连接的 Web 服务。 - A CLI tool that compresses a single large file
2. 一个压缩单个大文件的命令行工具。 - A service that queries 5 different databases and merges results
3. 一个同时查询 5 个数据库并合并结果的服务。 - A game engine running a physics simulation at 60 FPS
4. 一个以 60 FPS 跑物理模拟的游戏引擎。
🔑 Solution 🔑 参考答案
- Async — I/O-bound with massive concurrency. Each connection spends most time waiting for data. Threads would require 10K stacks.
1. 适合 async:典型 I/O 密集且并发量巨大。每个连接大部分时间都在等数据,线程模型会额外背上 1 万个栈空间。 - Sync/threads — CPU-bound, single task. Async adds overhead with no benefit. Use
rayonfor parallel compression.
2. 更适合同步或线程:这是 CPU 密集、单任务型工作,async 只会多一层开销,没啥收益。真要并行压缩,用rayon更靠谱。 - Async — Five concurrent I/O waits.
tokio::join!runs all five queries simultaneously.
3. 适合 async:这里的核心是多个独立 I/O 等待,可以用tokio::join!并发把五个查询一起推进。 - Sync/threads — CPU-bound, latency-sensitive. Async’s cooperative scheduling could introduce frame jitter.
4. 更适合同步或线程:这是 CPU 密集而且对延迟抖动敏感的场景,协作式调度可能带来帧时间抖动。
Key Takeaways — Why Async is Different
本章要点:Rust async 为什么特别不一样
- Rust futures are lazy — they do nothing until polled by an executor
Rust future 是惰性的,不被执行器poll就什么都不做。- There is no built-in runtime — you choose (or build) your own
Rust 没有内建运行时,要自己选择,必要时甚至可以自己写。- Async is a zero-cost compilation strategy that produces state machines
async 是一种零成本编译策略,本质上会生成状态机。- Async shines for I/O-bound concurrency; for CPU-bound work, use threads or rayon
async 最适合 I/O 并发;CPU 密集工作优先考虑线程或rayon。
See also: Ch 2 — The Future Trait for the trait that makes this all work, Ch 7 — Executors and Runtimes for choosing your runtime
继续阅读: 第 2 章:Future Trait 解释这一切依赖的核心 trait,第 7 章:执行器与运行时 继续讲运行时该怎么选。
2. The Future Trait 🟡
2. Future Trait 🟡
What you’ll learn:
将学到什么:
- The
Futuretrait:Output,poll(),Context,WakerFuturetrait 的核心组成:Output、poll()、Context、Waker- How a waker tells the executor “poll me again”
waker 如何通知执行器“再 poll 我一次”- The contract: never call
wake()= your program silently hangs
这套契约的关键:如果从不调用wake(),程序就会悄悄挂住- Implementing a real future by hand (
Delay)
如何手写一个真正可工作的 future,也就是Delay
Anatomy of a Future
Future 的结构
Everything in async Rust ultimately implements this trait:
Rust 异步体系中的一切,最终都要落到这个 trait 上:
#![allow(unused)]
fn main() {
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // The future has completed with value T
Pending, // The future is not ready yet — call me back later
}
}
That’s it. A Future is anything that can be polled — asked “are you done yet?” — and responds with either “yes, here’s the result” or “not yet, I’ll wake you up when I’m ready.”
核心就这么多。Future 就是任何可以被 poll 的对象,也就是可以被问一句“做完了吗?”;它要么回答“做完了,结果在这”,要么回答“还没有,等我准备好了会叫醒你”。
Output, poll(), Context, Waker
Output、poll()、Context、Waker
sequenceDiagram
participant E as Executor
participant F as Future
participant R as Resource (I/O)
E->>F: poll(cx)
F->>R: Check: is data ready?
R-->>F: Not yet
F->>R: Register waker from cx
F-->>E: Poll::Pending
Note over R: ... time passes, data arrives ...
R->>E: waker.wake() — "I'm ready!"
E->>F: poll(cx) — try again
F->>R: Check: is data ready?
R-->>F: Yes! Here's the data
F-->>E: Poll::Ready(data)
Let’s break down each piece:
下面把这些组成部分逐个拆开来看:
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// A future that returns 42 immediately
struct Ready42;
impl Future for Ready42 {
type Output = i32; // What the future eventually produces
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<i32> {
Poll::Ready(42) // Always ready — no waiting
}
}
}
The components:
这些组件分别代表:
Output— the type of value produced when the future completesOutput:future 完成时最终产出的值类型poll()— called by the executor to check progress; returnsReady(value)orPendingpoll():由执行器调用,用来检查 future 是否取得进展;返回值只能是Ready(value)或PendingPin<&mut Self>— ensures the future won’t be moved in memory (we’ll cover why in Ch. 4)Pin<&mut Self>:保证 future 在内存中不会被移动,至于原因会在第 4 章展开Context— carries theWakerso the future can signal the executor when it’s ready to make progressContext:内部携带Waker,future 可以借此在准备好继续推进时通知执行器
The Waker Contract
Waker 契约
The Waker is the callback mechanism. When a future returns Pending, it must arrange for waker.wake() to be called later — otherwise the executor will never poll it again and the program hangs.Waker 就是回调通知机制。当 future 返回 Pending 时,它必须安排之后某个时刻调用 waker.wake();否则执行器永远不会再次 poll 它,程序就会卡死。
#![allow(unused)]
fn main() {
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
/// A future that completes after a delay (toy implementation)
struct Delay {
completed: Arc<Mutex<bool>>,
waker_stored: Arc<Mutex<Option<Waker>>>,
duration: Duration,
started: bool,
}
impl Delay {
fn new(duration: Duration) -> Self {
Delay {
completed: Arc::new(Mutex::new(false)),
waker_stored: Arc::new(Mutex::new(None)),
duration,
started: false,
}
}
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Check if already completed
if *self.completed.lock().unwrap() {
return Poll::Ready(());
}
// Store the waker so the background thread can wake us
*self.waker_stored.lock().unwrap() = Some(cx.waker().clone());
// Start the background timer on first poll
if !self.started {
self.started = true;
let completed = Arc::clone(&self.completed);
let waker = Arc::clone(&self.waker_stored);
let duration = self.duration;
thread::spawn(move || {
thread::sleep(duration);
*completed.lock().unwrap() = true;
// CRITICAL: wake the executor so it polls us again
if let Some(w) = waker.lock().unwrap().take() {
w.wake(); // "Hey executor, I'm ready — poll me again!"
}
});
}
Poll::Pending // Not done yet
}
}
}
Key insight: In C#, the TaskScheduler handles waking automatically.
关键理解:在 C# 里,唤醒逻辑通常由 TaskScheduler 自动处理。 In Rust, you (or the I/O library you use) are responsible for calling
而在 Rust 里,调用waker.wake(). Forget it, and your program silently hangs.waker.wake()的责任在开发者自己,或者所使用的 I/O 库身上。漏掉这一点,程序就会悄无声息地挂住。
Exercise: Implement a CountdownFuture
练习:实现一个 CountdownFuture
🏋️ Exercise (click to expand)
🏋️ 练习(点击展开)
Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!").
挑战:实现一个 CountdownFuture,让它从 N 倒数到 0,每次被 poll 时都打印当前数字;当计数归零时,返回 Ready("Liftoff!")。
Hint: The future needs to store the current count and decrement it on each poll. Remember to always re-register the waker!
提示:这个 future 需要保存当前计数,并在每次 poll 时递减。记得每次都要重新注册 waker。
🔑 Solution
🔑 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountdownFuture {
count: u32,
}
impl CountdownFuture {
fn new(start: u32) -> Self {
CountdownFuture { count: start }
}
}
impl Future for CountdownFuture {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
println!("Liftoff!");
Poll::Ready("Liftoff!")
} else {
println!("{}...", self.count);
self.count -= 1;
cx.waker().wake_by_ref(); // Schedule re-poll immediately
Poll::Pending
}
}
}
}
Key takeaway: This future is polled once per count. Each time it returns Pending, it immediately wakes itself to be polled again. In production, you’d use a timer instead of busy-polling.
关键点:这个 future 每减少一次计数就会被 poll 一次。每次返回 Pending 时,它都会立即把自己重新唤醒,以便再次被 poll。生产环境里通常会用计时器,而不是这种忙轮询方式。
Key Takeaways — The Future Trait
关键结论:Future Trait
Future::poll()returnsPoll::Ready(value)orPoll::PendingFuture::poll()的返回值只能是Poll::Ready(value)或Poll::Pending- A future must register a
Wakerbefore returningPending— the executor uses it to know when to re-poll
future 在返回Pending之前必须注册Waker,执行器靠它来判断何时再次 pollPin<&mut Self>guarantees the future won’t be moved in memory (needed for self-referential state machines — see Ch 4)Pin<&mut Self>保证 future 不会在内存中移动,这对自引用状态机是必需的,详见第 4 章- Everything in async Rust —
async fn,.await, combinators — is built on this one trait
Rust async 里的所有东西,无论是async fn、.await还是各种组合器,底层都建立在这一个 trait 之上
See also: Ch 3 — How Poll Works for the executor loop, Ch 6 — Building Futures by Hand for more complex implementations
延伸阅读: 第 3 章 How Poll Works 介绍执行器循环;第 6 章 Building Futures by Hand 介绍更复杂的手写实现。
3. How Poll Works 🟡
3. poll 到底是怎么运作的 🟡
What you’ll learn:
本章将学到什么:
- The executor’s poll loop: poll → pending → wake → poll again
执行器的poll循环:poll→Pending→wake→ 再次poll- How to build a minimal executor from scratch
如何从零写出一个最小执行器- Spurious wake rules and why they matter
虚假唤醒规则是什么,以及它为什么重要- Utility functions:
poll_fn()andyield_now()
两个常用工具:poll_fn()和yield_now()
The Polling State Machine
轮询状态机
The executor runs a loop: poll a future, if it’s Pending, park it until its waker fires, then poll again. This is fundamentally different from OS threads where the kernel handles scheduling.
执行器的核心动作就是循环调用 future 的 poll:如果返回 Pending,就把它挂起,等对应的 waker 触发后再回来继续 poll。这和操作系统线程很不一样,线程调度主要由内核负责,而 async 调度基本是用户态自己做。
stateDiagram-v2
[*] --> Idle : Future created
Idle --> Polling : executor calls poll()
Polling --> Complete : Ready(value)
Polling --> Waiting : Pending
Waiting --> Polling : waker.wake() called
Complete --> [*] : Value returned
Important: While in the Waiting state the future must have registered the waker with an I/O source. No registration = hang forever.
重点: future 进入 Waiting 状态时,必须 已经把 waker 注册到 I/O 事件源上。要是没注册,就没人会把它唤醒,它会老老实实卡到天荒地老。
A Minimal Executor
一个最小执行器
To demystify executors, let’s build the simplest possible one:
为了把执行器这件事讲透,先写一个最简单、近乎裸奔版的执行器:
use std::future::Future;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::pin::Pin;
/// The simplest possible executor: busy-loop poll until Ready
fn block_on<F: Future>(mut future: F) -> F::Output {
// Pin the future on the stack
// SAFETY: `future` is never moved after this point — we only
// access it through the pinned reference until it completes.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
// Create a no-op waker (just keeps polling — inefficient but simple)
fn noop_raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker { noop_raw_waker() }
let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(std::ptr::null(), vtable)
}
// SAFETY: noop_raw_waker() returns a valid RawWaker with a correct vtable.
let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
let mut cx = Context::from_waker(&waker);
// Busy-loop until the future completes
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(value) => return value,
Poll::Pending => {
// A real executor would park the thread here
// and wait for waker.wake() — we just spin
std::thread::yield_now();
}
}
}
}
// Usage:
fn main() {
let result = block_on(async {
println!("Hello from our mini executor!");
42
});
println!("Got: {result}");
}
这个版本很土,但它把核心逻辑掰得非常清楚:执行器其实就是一个反复调用 poll() 的循环。future 说“还没准备好”,执行器就先让开;future 说“值准备好了”,执行器就把结果取走。
真正常见的运行时,例如 Tokio,不会像这个例子一样傻转 CPU,而是会把线程挂到 epoll、kqueue、io_uring 这类系统机制上,等事件到了再回来继续跑。
Don’t use this in production! It busy-loops, wasting CPU. Real executors (tokio, smol) use
epoll/kqueue/io_uringto sleep until I/O is ready. But this shows the core idea: an executor is just a loop that callspoll().
别把这玩意拿去上生产。 它会忙等,纯烧 CPU。真正的执行器会睡眠等待 I/O 就绪。但这段代码非常适合建立直觉:执行器本质上就是一个会反复调用poll()的调度循环。
Wake-Up Notifications
唤醒通知
A real executor is event-driven. When all futures are Pending, the executor sleeps. The waker is an interrupt mechanism:
真正的执行器是事件驱动的。当所有 future 都是 Pending 时,它会休眠;等某个 waker 触发,再回来继续调度。waker 就像是用户态的一套“中断通知”。
#![allow(unused)]
fn main() {
// Conceptual model of a real executor's main loop:
fn executor_loop(tasks: &mut TaskQueue) {
loop {
// 1. Poll all tasks that have been woken
while let Some(task) = tasks.get_woken_task() {
match task.poll() {
Poll::Ready(result) => task.complete(result),
Poll::Pending => { /* task stays in queue, waiting for wake */ }
}
}
// 2. Sleep until something wakes us up (epoll_wait, kevent, etc.)
// This is where mio/polling does the heavy lifting
tasks.wait_for_events(); // blocks until an I/O event or waker fires
}
}
}
可以把这个过程理解成两步:先处理已经被唤醒的任务,再去系统层等待下一批事件。谁醒了,谁先回来被 poll。没醒的任务,执行器连看都懒得看。
这也是 async 高效的关键之一:它不靠一堆线程空转,而是让真正有进展的任务继续跑。
Spurious Wakes
虚假唤醒
A future may be polled even when its I/O isn’t ready. This is called a spurious wake. Futures must handle this correctly:
有时候 future 会被唤醒,但对应的 I/O 其实还没真准备好。这就叫 spurious wake,也就是虚假唤醒。实现 Future 时必须正确处理这种情况。
#![allow(unused)]
fn main() {
impl Future for MyFuture {
type Output = Data;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Data> {
// ✅ CORRECT: Always re-check the actual condition
if let Some(data) = self.try_read_data() {
Poll::Ready(data)
} else {
// Re-register the waker (it might have changed!)
self.register_waker(cx.waker());
Poll::Pending
}
// ❌ WRONG: Assuming poll means data is ready
// let data = self.read_data(); // might block or panic
// Poll::Ready(data)
}
}
}
这里最容易犯蠢的地方,就是误以为“既然又被 poll 了,那一定有数据了”。这想法可太天真了。poll 只是一次机会,不是成功保证。
每次进 poll() 都要重新检查真实条件,发现还没好,就重新登记 waker,然后继续返回 Pending。
Rules for implementing poll():
实现 poll() 时的规则:
- Never block — return
Pendingimmediately if not ready
1. 绝对别阻塞:条件没满足就立刻返回Pending。 - Always re-register the waker — it may have changed between polls
2. 每次都重新注册 waker:两次poll之间,waker 可能已经变了。 - Handle spurious wakes — check the actual condition, don’t assume readiness
3. 正确处理虚假唤醒:检查真实状态,别脑补“既然被叫醒就一定能继续”。 - Don’t poll after
Ready— behavior is unspecified (may panic, returnPending, or repeatReady). OnlyFusedFutureguarantees safe post-completion polling
4. 返回Ready后别再继续poll:这之后的行为是 未指定的,可能 panic,可能返回Pending,也可能重复Ready。只有FusedFuture才会额外保证完成后继续轮询是安全的。
🏋️ Exercise: Implement a CountdownFuture 🏋️ 练习:实现一个倒计时 Future
Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count as a side-effect each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!"). (Note: a Future produces only one final value — the printing is a side-effect, not a yielded value. For multiple async values, see Stream in Ch. 11.)
挑战题: 实现一个 CountdownFuture,从 N 倒数到 0。每次被 poll 时,都把当前数字打印出来作为副作用;当数到 0 时,返回 Ready("Liftoff!")``。注意,Future最终只产生 **一个** 值,打印只是副作用,不是多次产出。要处理多次异步产出,得看第 11 章的Stream`。
Hint: This doesn’t need a real I/O source — it can wake itself immediately with cx.waker().wake_by_ref() after each decrement.
提示: 这个练习不需要真实 I/O。每次减 1 之后,用 cx.waker().wake_by_ref() 立刻把自己重新唤醒就行。
🔑 Solution 🔑 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountdownFuture {
count: u32,
}
impl CountdownFuture {
fn new(start: u32) -> Self {
CountdownFuture { count: start }
}
}
impl Future for CountdownFuture {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready("Liftoff!")
} else {
println!("{}...", self.count);
self.count -= 1;
// Wake immediately — we're always ready to make progress
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
// Usage with our mini executor or tokio:
// let msg = block_on(CountdownFuture::new(5));
// prints: 5... 4... 3... 2... 1...
// msg == "Liftoff!"
}
Key takeaway: Even though this future is always ready to progress, it returns Pending to yield control between steps. It calls wake_by_ref() immediately so the executor re-polls it right away. This is the basis of cooperative multitasking — each future voluntarily yields.
关键点: 这个 future 虽然每一步都能继续推进,但它依然会先返回 Pending,把控制权交还给执行器,然后立刻主动唤醒自己。协作式多任务的基础就是这个味道:每个 future 在合适的位置主动让出执行机会。
Handy Utilities: poll_fn and yield_now
两个顺手好用的工具:poll_fn 和 yield_now
Two utilities from the standard library and tokio that avoid writing full Future impls:
标准库和 Tokio 各给了一个很好用的小工具,很多时候可以避免手写整套 Future 实现:
#![allow(unused)]
fn main() {
use std::future::poll_fn;
use std::task::Poll;
// poll_fn: create a one-off future from a closure
let value = poll_fn(|cx| {
// Do something with cx.waker(), return Ready or Pending
Poll::Ready(42)
}).await;
// Real-world use: bridge a callback-based API into async
async fn read_when_ready(source: &MySource) -> Data {
poll_fn(|cx| source.poll_read(cx)).await
}
}
poll_fn() 很适合把“已经有一个像 poll_xxx(cx) 这样的回调式接口”包装成 async 形式。它特别适合作桥接层,用一次就走,不必专门定义一个新 future 类型。
如果只是临时把某段轮询逻辑塞进 async 流程,poll_fn() 基本就是现成的瑞士军刀。
#![allow(unused)]
fn main() {
// yield_now: voluntarily yield control to the executor
// Useful in CPU-heavy async loops to avoid starving other tasks
async fn cpu_heavy_work(items: &[Item]) {
for (i, item) in items.iter().enumerate() {
process(item); // CPU work
// Every 100 items, yield to let other tasks run
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
}
yield_now() 则是“我虽然还能继续算,但先让别人跑一下”的主动让步。这个在 CPU 密集型 async 循环里尤其重要,不然一个任务可能霸占执行器线程,把别的任务活活饿着。
只要一个 async 函数里长时间没有 .await,它就很容易把 cooperative scheduling 搞成单任务独占。适当插入 yield_now().await,调度才会重新均衡起来。
When to use
yield_now(): If your async function does CPU work in a loop without any.awaitpoints, it monopolizes the executor thread. Insertyield_now().awaitperiodically to enable cooperative multitasking.
什么时候该用yield_now(): 如果 async 函数在循环里做的是纯 CPU 工作,而且长时间没有.await,那它就会长期霸占执行器线程。周期性插入yield_now().await,才能把协作式调度重新扶正。
Key Takeaways — How Poll Works
本章要点:poll的工作方式
- An executor repeatedly calls
poll()on futures that have been woken
执行器会反复对已被唤醒的 future 调用poll()。- Futures must handle spurious wakes — always re-check the actual condition
future 必须能处理 虚假唤醒,每次都重新检查真实条件。poll_fn()lets you create ad-hoc futures from closurespoll_fn()可以把闭包快速包装成临时 future。yield_now()is a cooperative scheduling escape hatch for CPU-heavy async codeyield_now()是 CPU 密集型 async 代码里常用的协作式调度让步点。
See also: Ch 2 — The Future Trait for the trait definition, Ch 5 — The State Machine Reveal for what the compiler generates
继续阅读: 第 2 章:Future Trait 讲 trait 定义本身,第 5 章:状态机的真相 会继续拆开编译器到底生成了什么。
4. Pin and Unpin 🔴
4. Pin 与 Unpin 🔴
What you’ll learn:
本章将学到什么:
- Why self-referential structs break when moved in memory
为什么自引用结构体一旦在内存里移动就会出问题- What
Pin<P>guarantees and how it prevents movesPin<P>到底保证了什么,以及它如何阻止移动- The three practical pinning patterns:
Box::pin(),tokio::pin!(),Pin::new()
三种常见的 pin 用法:Box::pin()、tokio::pin!()、Pin::new()- When
Unpingives you an escape hatch
什么时候Unpin能让局面轻松很多
Why Pin Exists
为什么会有 Pin
This is the most confusing concept in async Rust. Let’s build the intuition step by step.
这是 async Rust 里最容易把人绕晕的概念之一。别急,按步骤把直觉搭起来就顺了。
The Problem: Self-Referential Structs
问题所在:自引用结构体
When the compiler transforms an async fn into a state machine, that state machine may contain references to its own fields. This creates a self-referential struct — and moving it in memory would invalidate those internal references.
编译器把 async fn 变成状态机之后,这个状态机里可能会包含指向它自身字段的引用。这样一来,就形成了一个 自引用结构体。而这种结构如果在内存里被挪动,内部那些引用就会立刻失效。
#![allow(unused)]
fn main() {
// What the compiler generates (simplified) for:
// async fn example() {
// let data = vec![1, 2, 3];
// let reference = &data; // Points to data above
// use_ref(reference).await;
// }
// Becomes something like:
enum ExampleStateMachine {
State0 {
data: Vec<i32>,
// reference: &Vec<i32>, // PROBLEM: points to `data` above
// // If this struct moves, the pointer is dangling!
},
State1 {
data: Vec<i32>,
reference: *const Vec<i32>, // Internal pointer to data field
},
Complete,
}
}
graph LR
subgraph "Before Move (Valid)"
A["data: [1,2,3]<br/>at addr 0x1000"]
B["reference: 0x1000<br/>(points to data)"]
B -->|"valid"| A
end
subgraph "After Move (INVALID)"
C["data: [1,2,3]<br/>at addr 0x2000"]
D["reference: 0x1000<br/>(still points to OLD location!)"]
D -->|"dangling!"| E["💥 0x1000<br/>(freed/garbage)"]
end
style E fill:#ffcdd2,color:#000
style D fill:#ffcdd2,color:#000
style B fill:#c8e6c9,color:#000
Self-Referential Structs
自引用结构体并不是纸上谈兵
This isn’t an academic concern. Every async fn that holds a reference across an .await point creates a self-referential state machine:
这不是学术层面的边角问题。任何一个在 .await 前后跨越持有引用的 async fn,都会生成自引用状态机:
#![allow(unused)]
fn main() {
async fn problematic() {
let data = String::from("hello");
let slice = &data[..]; // slice borrows data
some_io().await; // <-- .await point: state machine stores both data AND slice
println!("{slice}"); // uses the reference after await
}
// The generated state machine has `data: String` and `slice: &str`
// where slice points INTO data. Moving the state machine = dangling pointer.
}
Pin in Practice
Pin 在实战里是什么样子
Pin<P> is a wrapper that prevents moving the value behind the pointer:Pin<P> 是一个包装器,用来阻止指针背后的那个值被移动:
#![allow(unused)]
fn main() {
use std::pin::Pin;
let mut data = String::from("hello");
// Pin it — now it can't be moved
let pinned: Pin<&mut String> = Pin::new(&mut data);
// Can still use it:
println!("{}", pinned.as_ref().get_ref()); // "hello"
// But we can't get &mut String back (which would allow mem::swap):
// let mutable: &mut String = Pin::into_inner(pinned); // Only if String: Unpin
// String IS Unpin, so this actually works for String.
// But for self-referential state machines (which are !Unpin), it's blocked.
}
In real code, you mostly encounter Pin in three places:
实际代码里,Pin 主要会在三种地方碰见:
#![allow(unused)]
fn main() {
// 1. poll() signature — all futures are polled through Pin
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output>;
// 2. Box::pin() — heap-allocate and pin a future
let future: Pin<Box<dyn Future<Output = i32>>> = Box::pin(async { 42 });
// 3. tokio::pin!() — pin a future on the stack
tokio::pin!(my_future);
// Now my_future: Pin<&mut impl Future>
}
The Unpin Escape Hatch
Unpin 这条“逃生通道”
Most types in Rust are Unpin — they don’t contain self-references, so pinning is a no-op. Only compiler-generated state machines (from async fn) are !Unpin.
Rust 里绝大多数类型都是 Unpin,因为它们根本不包含自引用,所以 pin 上去也只是走个形式。真正属于 !Unpin 的,主要是编译器从 async fn 生成出来的那些状态机。
#![allow(unused)]
fn main() {
// These are all Unpin — pinning them does nothing special:
// i32, String, Vec<T>, HashMap<K,V>, Box<T>, &T, &mut T
// These are !Unpin — they MUST be pinned before polling:
// The state machines generated by `async fn` and `async {}`
// Practical implication:
// If you write a Future by hand and it has NO self-references,
// implement Unpin to make it easier to work with:
impl Unpin for MySimpleFuture {} // "I'm safe to move, trust me"
}
Quick Reference
快速参考
| What | When | How |
|---|---|---|
| Pin a future on the heap 把 future pin 到堆上 | Storing in a collection, returning from function 需要放进集合,或者要从函数里返回 | Box::pin(future)Box::pin(future) |
| Pin a future on the stack 把 future pin 到栈上 | Local use in select! or manual polling只在局部 select! 或手动 poll 时使用 | tokio::pin!(future) or pin_mut! from pin-utilstokio::pin!(future) 或 pin-utils 里的 pin_mut! |
| Pin in function signature 在函数签名里使用 Pin | Accepting pinned futures 函数要接收已经 pin 好的 future | future: Pin<&mut F>future: Pin<&mut F> |
| Require Unpin 要求 Unpin | When you need to move a future after creation future 创建之后还需要继续移动它 | F: Future + UnpinF: Future + Unpin |
🏋️ Exercise: Pin and Move 🏋️ 练习:Pin 与移动
Challenge: Which of these code snippets compile? For each one that doesn’t, explain why and fix it.
挑战题:下面这些代码片段里,哪些能编译通过?哪些不能?对每个不能通过的片段,说明原因并给出修正方式。
#![allow(unused)]
fn main() {
// Snippet A
let fut = async { 42 };
let pinned = Box::pin(fut);
let moved = pinned; // Move the Box
let result = moved.await;
// Snippet B
let fut = async { 42 };
tokio::pin!(fut);
let moved = fut; // Move the pinned future
let result = moved.await;
// Snippet C
use std::pin::Pin;
let mut fut = async { 42 };
let pinned = Pin::new(&mut fut);
}
🔑 Solution 🔑 参考答案
Snippet A: ✅ Compiles. Box::pin() puts the future on the heap. Moving the Box moves the pointer, not the future itself. The future stays pinned in its heap location.
片段 A:✅ 可以编译。 Box::pin() 会把 future 放到堆上。后面移动的是 Box 这个 指针,不是 future 本体,所以 future 仍然固定在原来的堆地址上。
Snippet B: ❌ Does not compile. tokio::pin! pins the future to the stack and rebinds fut as Pin<&mut ...>. You can’t move out of a pinned reference. Fix: Don’t move it — use it in place:
片段 B:❌ 不能编译。 tokio::pin! 会把 future pin 在栈上,并把 fut 重新绑定成 Pin<&mut ...>。从一个已经 pin 住的引用里把值再挪出去,是不允许的。修正办法:别移动它,原地使用。
#![allow(unused)]
fn main() {
let fut = async { 42 };
tokio::pin!(fut);
let result = fut.await; // Use directly, don't reassign
}
Snippet C: ❌ Does not compile. Pin::new() requires T: Unpin. Async blocks generate !Unpin types. Fix: Use Box::pin() or unsafe Pin::new_unchecked():
片段 C:❌ 不能编译。 Pin::new() 要求 T: Unpin,而 async block 生成出来的类型通常是 !Unpin。修正办法:用 Box::pin(),或者在极少数非常确定的场景下用 unsafe Pin::new_unchecked()。
#![allow(unused)]
fn main() {
let fut = async { 42 };
let pinned = Box::pin(fut); // Heap-pin — works with !Unpin
}
Key takeaway: Box::pin() is the safe, easy way to pin !Unpin futures. tokio::pin!() pins on the stack but the future can’t be moved after. Pin::new() only works with Unpin types.
关键结论:Box::pin() 是处理 !Unpin future 最稳妥、最省心的方式。tokio::pin!() 能把 future pin 在栈上,但之后就别想再挪它了。Pin::new() 只适用于 Unpin 类型。
Key Takeaways — Pin and Unpin
本章要点:Pin 与 Unpin
Pin<P>is a wrapper that prevents the pointee from being moved — essential for self-referential state machinesPin<P>是一个包装器,它会 阻止被指向的值继续移动,这对自引用状态机至关重要。Box::pin()is the safe, easy default for pinning futures on the heapBox::pin()是把 future pin 到堆上的安全默认解法。tokio::pin!()pins on the stack — cheaper but the future can’t be moved afterwardtokio::pin!()会把 future pin 在栈上,成本更低,但后面就不能再移动它。Unpinis an auto-trait opt-out: types that implementUnpincan be moved even when pinned (most types areUnpin; async blocks are not)Unpin是一个自动 trait。实现了Unpin的类型,即使被 pin,也仍然允许移动。大多数普通类型都是Unpin,但 async block 生成的类型通常不是。
See also: Ch 2 — The Future Trait for
Pin<&mut Self>in poll, Ch 5 — The State Machine Reveal for why async state machines are self-referential
继续阅读: 第 2 章:Future Trait 会解释 poll 里的Pin<&mut Self>,第 5 章:状态机的真相 会说明为什么 async 状态机天然就是自引用的。
5. The State Machine Reveal 🟢
5. 状态机的真相 🟢
What you’ll learn:
本章将学到什么:
- How the compiler transforms
async fninto an enum state machine
编译器如何把async fn变成基于枚举的状态机- Side-by-side comparison: source code vs generated states
源代码与生成状态之间的一一对照- Why large stack allocations in
async fnblow up future sizes
为什么async fn里的大栈分配会把 future 的尺寸撑爆- The drop optimization: values drop as soon as they’re no longer needed
drop 优化:值一旦不再需要就会立刻被释放
What the Compiler Actually Generates
编译器实际生成了什么
When you write async fn, the compiler transforms your sequential-looking code into an enum-based state machine. Understanding this transformation is the key to understanding async Rust’s performance characteristics and many of its quirks.
当写下 async fn 时,编译器会把看起来像顺序执行的代码改写成一个基于枚举的状态机。想弄懂 async Rust 的性能特征和很多古怪之处,关键就是先把这个转换过程看明白。
Side-by-Side: async fn vs State Machine
并排看:async fn 与状态机
#![allow(unused)]
fn main() {
// What you write:
async fn fetch_two_pages() -> String {
let page1 = http_get("https://example.com/a").await;
let page2 = http_get("https://example.com/b").await;
format!("{page1}\n{page2}")
}
}
The compiler generates something conceptually like this:
编译器在概念上会生成类似下面这样的东西:
#![allow(unused)]
fn main() {
enum FetchTwoPagesStateMachine {
// State 0: About to call http_get for page1
Start,
// State 1: Waiting for page1, holding the future
WaitingPage1 {
fut1: HttpGetFuture,
},
// State 2: Got page1, waiting for page2
WaitingPage2 {
page1: String,
fut2: HttpGetFuture,
},
// Terminal state
Complete,
}
impl Future for FetchTwoPagesStateMachine {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
loop {
match self.as_mut().get_mut() {
Self::Start => {
let fut1 = http_get("https://example.com/a");
*self.as_mut().get_mut() = Self::WaitingPage1 { fut1 };
}
Self::WaitingPage1 { fut1 } => {
let page1 = match Pin::new(fut1).poll(cx) {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
let fut2 = http_get("https://example.com/b");
*self.as_mut().get_mut() = Self::WaitingPage2 { page1, fut2 };
}
Self::WaitingPage2 { page1, fut2 } => {
let page2 = match Pin::new(fut2).poll(cx) {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
let result = format!("{page1}\n{page2}");
*self.as_mut().get_mut() = Self::Complete;
return Poll::Ready(result);
}
Self::Complete => panic!("polled after completion"),
}
}
}
}
}
Note: This desugaring is conceptual. The real compiler output uses
unsafepin projections — theget_mut()calls shown here requireUnpin, but async state machines are!Unpin. The goal is to illustrate state transitions, not produce compilable code.
注意: 这段反语法糖代码只是 概念示意。真正的编译器输出会使用unsafe的 pin projection。这里展示的get_mut()需要Unpin,但 async 状态机本身通常是!Unpin。重点是说明状态如何迁移,不是给出一份可直接编译的实现。
stateDiagram-v2
[*] --> Start
Start --> WaitingPage1: Create http_get future #1
WaitingPage1 --> WaitingPage1: poll() → Pending
WaitingPage1 --> WaitingPage2: poll() → Ready(page1)
WaitingPage2 --> WaitingPage2: poll() → Pending
WaitingPage2 --> Complete: poll() → Ready(page2)
Complete --> [*]: Return format!("{page1}\\n{page2}")
State contents:
各个状态里保存的内容:
- WaitingPage1 — stores
fut1: HttpGetFuture(page2 not yet allocated)
WaitingPage1:保存fut1: HttpGetFuture,此时page2还没开始分配。- WaitingPage2 — stores
page1: String,fut2: HttpGetFuture(fut1 has been dropped)
WaitingPage2:保存page1: String和fut2: HttpGetFuture,这时fut1已经被释放了。
Why This Matters for Performance
为什么这和性能直接相关
Zero-cost: The state machine is a stack-allocated enum. No heap allocation per future, no garbage collector, no boxing — unless you explicitly use Box::pin().
零成本:这个状态机本质上是一个分配在栈上的枚举。每个 future 默认都不会额外做堆分配,也没有垃圾回收,更不会自动装箱,除非显式使用 Box::pin()。
Size: The enum’s size is the maximum of all its variants. Each .await point creates a new variant. This means:
尺寸:这个枚举的大小取决于所有变体里最大的那个。每出现一个 .await,就会多出一个新状态。因此会出现下面这种情况:
#![allow(unused)]
fn main() {
async fn small() {
let a: u8 = 0;
yield_now().await;
let b: u8 = 0;
yield_now().await;
}
// Size ≈ max(size_of(u8), size_of(u8)) + discriminant + future sizes
// ≈ small!
async fn big() {
let buf: [u8; 1_000_000] = [0; 1_000_000]; // 1MB on the stack!
some_io().await;
process(&buf);
}
// Size ≈ 1MB + inner future sizes
// ⚠️ Don't stack-allocate huge buffers in async functions!
// Use Vec<u8> or Box<[u8]> instead.
}
Drop optimization: When a state machine transitions, it drops values no longer needed. In the example above, fut1 is dropped when we transition from WaitingPage1 to WaitingPage2 — the compiler inserts the drop automatically.
Drop 优化:状态机一旦迁移,就会把后续不再需要的值立刻释放掉。上面的例子里,从 WaitingPage1 切到 WaitingPage2 时,fut1 就会被自动 drop,这个释放动作由编译器直接插进去。
Practical rule: Large stack allocations in
async fnblow up the future’s size. If you see stack overflows in async code, check for large arrays or deeply nested futures. UseBox::pin()to heap-allocate sub-futures if needed.
实战规则:async fn里的大栈分配会直接把 future 的体积顶上去。如果在 async 代码里遇到栈溢出,先去查有没有超大数组,或者 future 嵌套得过深。必要时用Box::pin()把子 future 放到堆上。
Exercise: Predict the State Machine
练习:预测状态机
🏋️ Exercise 🏋️ 练习
Challenge: Given this async function, sketch the state machine the compiler generates. How many states (enum variants) does it have? What values are stored in each?
挑战题:给定下面这个 async 函数,画出编译器会生成的状态机。它总共有多少个状态,也就是多少个枚举变体?每个状态里各自保存什么值?
#![allow(unused)]
fn main() {
async fn pipeline(url: &str) -> Result<usize, Error> {
let response = fetch(url).await?;
let body = response.text().await?;
let parsed = parse(body).await?;
Ok(parsed.len())
}
}
🔑 Solution 🔑 参考答案
Four states:
可以拆成四个核心等待状态,再加一个完成态:
- Start — stores
url
1. Start:保存url。 - WaitingFetch — stores
url,fetchfuture
2. WaitingFetch:保存url和fetchfuture。 - WaitingText — stores
response,text()future
3. WaitingText:保存response和text()future。 - WaitingParse — stores
body,parsefuture
4. WaitingParse:保存body和parsefuture。 - Done — returned
Ok(parsed.len())
5. Done:已经返回Ok(parsed.len())。
Each .await creates a yield point = a new enum variant. The ? adds early-exit paths but doesn’t add extra states — it’s just a match on the Poll::Ready value.
每个 .await 都对应一个新的 yield point,也就对应一个新的枚举变体。? 只是在 Poll::Ready 之后追加了错误分支处理,本身不会额外引入新的状态。
Key Takeaways — The State Machine Reveal
本章要点:状态机的真相
async fncompiles to an enum with one variant per.awaitpointasync fn会被编译成一个枚举,每个.await都对应一个状态变体。- The future’s size = max of all variant sizes — large stack values blow it up
future 的 尺寸 等于所有变体中最大的那个,因此大栈对象会把它直接撑大。- The compiler inserts drops at state transitions automatically
状态迁移时需要的 drop 会由编译器自动插入。- Use
Box::pin()or heap allocation when future size becomes a problem
如果 future 体积成了问题,就用Box::pin()或其他堆分配方式拆分它。
See also: Ch 4 — Pin and Unpin for why the generated enum needs pinning, Ch 6 — Building Futures by Hand to build these state machines yourself
继续阅读: 第 4 章:Pin 与 Unpin 会解释为什么生成的枚举需要 pin,第 6 章:手写 Future 会带着亲手把这类状态机写出来。
6. Building Futures by Hand 🟡
6. 亲手构建 Future 🟡
What you’ll learn:
本章将学到什么:
- Implementing a
TimerFuturewith thread-based waking
如何实现一个依靠线程唤醒的TimerFuture- Building a
Joincombinator: run two futures concurrently
如何构建Join组合子:并发推进两个 future- Building a
Selectcombinator: race two futures
如何构建Select组合子:让两个 future 赛跑- How combinators compose — futures all the way down
组合子是如何层层嵌套的:归根到底还是 future 套 future
A Simple Timer Future
一个简单的计时器 Future
Now let’s build some real and useful futures from scratch. This chapter is where chapters 2 through 5 really settle into muscle memory.
现在开始从零手搓一些真正有用的 future。前面第 2 到第 5 章讲过的那些概念,到这里才算真正被钉牢。
TimerFuture: A Complete Example
TimerFuture:一个完整例子
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn a thread that sets completed=true after the duration
let thread_shared_state = Arc::clone(&shared_state);
thread::spawn(move || {
thread::sleep(duration);
let mut state = thread_shared_state.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
waker.wake(); // Notify the executor
}
});
TimerFuture { shared_state }
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// Store the waker so the timer thread can wake us
// IMPORTANT: Always update the waker — the executor may
// have changed it between polls
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
// Usage:
// async fn example() {
// println!("Starting timer...");
// TimerFuture::new(Duration::from_secs(2)).await;
// println!("Timer done!");
// }
//
// ⚠️ This spawns an OS thread per timer — fine for learning, but in
// production use `tokio::time::sleep` which is backed by a shared
// timer wheel and requires zero extra threads.
}
This example has all the moving pieces a real future needs: state storage, a poll() method, and a way to wake the executor later.
这个例子已经把一个真实 future 需要的零件全摆出来了:状态存储、poll() 实现,以及稍后重新唤醒执行器的办法。
Join: Running Two Futures Concurrently
Join:并发推进两个 Future
Join polls two futures and finishes only when both are complete. This is the core idea behind tokio::join!.Join 会同时轮询两个 future,只有当 两者都完成 时才算结束。这就是 tokio::join! 背后的核心思路。
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Polls two futures concurrently, returns both results as a tuple
pub struct Join<A, B>
where
A: Future,
B: Future,
{
a: MaybeDone<A>,
b: MaybeDone<B>,
}
enum MaybeDone<F: Future> {
Pending(F),
Done(F::Output),
Taken, // Output has been taken
}
impl<A, B> Join<A, B>
where
A: Future,
B: Future,
{
pub fn new(a: A, b: B) -> Self {
Join {
a: MaybeDone::Pending(a),
b: MaybeDone::Pending(b),
}
}
}
impl<A, B> Future for Join<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = (A::Output, B::Output);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll A if not done
if let MaybeDone::Pending(ref mut fut) = self.a {
if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
self.a = MaybeDone::Done(val);
}
}
// Poll B if not done
if let MaybeDone::Pending(ref mut fut) = self.b {
if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
self.b = MaybeDone::Done(val);
}
}
// Both done?
match (&self.a, &self.b) {
(MaybeDone::Done(_), MaybeDone::Done(_)) => {
// Take both outputs
let a_val = match std::mem::replace(&mut self.a, MaybeDone::Taken) {
MaybeDone::Done(v) => v,
_ => unreachable!(),
};
let b_val = match std::mem::replace(&mut self.b, MaybeDone::Taken) {
MaybeDone::Done(v) => v,
_ => unreachable!(),
};
Poll::Ready((a_val, b_val))
}
_ => Poll::Pending, // At least one is still pending
}
}
}
// Usage:
// let (page1, page2) = Join::new(
// http_get("https://example.com/a"),
// http_get("https://example.com/b"),
// ).await;
// Both requests run concurrently!
}
Key insight: “Concurrent” here means interleaved on the same thread.
Joindoes not create threads; it simply polls both child futures during the samepoll()cycle.
关键理解: 这里的“并发”指的是 在同一线程上交错推进。Join不会新开线程,它只是把两个子 future 放进同一个poll()周期里轮着推。
graph LR
subgraph "Future Combinators<br/>Future 组合子"
direction TB
TIMER["TimerFuture<br/>Single future, wake after delay<br/>单个 future,延迟后唤醒"]
JOIN["Join<A, B><br/>Wait for BOTH<br/>等待两者都完成"]
SELECT["Select<A, B><br/>Wait for FIRST<br/>谁先完成等谁"]
RETRY["RetryFuture<br/>Re-create on failure<br/>失败后重新创建"]
end
TIMER --> JOIN
TIMER --> SELECT
SELECT --> RETRY
style TIMER fill:#d4efdf,stroke:#27ae60,color:#000
style JOIN fill:#e8f4f8,stroke:#2980b9,color:#000
style SELECT fill:#fef9e7,stroke:#f39c12,color:#000
style RETRY fill:#fadbd8,stroke:#e74c3c,color:#000
Select: Racing Two Futures
Select:让两个 Future 赛跑
Select finishes as soon as either future completes, and whichever future loses the race gets dropped.Select 只要发现 任意一个 future 先完成,就立即结束,输掉赛跑的那个 future 会被直接丢弃。
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub enum Either<A, B> {
Left(A),
Right(B),
}
/// Returns whichever future completes first; drops the other
pub struct Select<A, B> {
a: A,
b: B,
}
impl<A, B> Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
pub fn new(a: A, b: B) -> Self {
Select { a, b }
}
}
impl<A, B> Future for Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = Either<A::Output, B::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll A first
if let Poll::Ready(val) = Pin::new(&mut self.a).poll(cx) {
return Poll::Ready(Either::Left(val));
}
// Then poll B
if let Poll::Ready(val) = Pin::new(&mut self.b).poll(cx) {
return Poll::Ready(Either::Right(val));
}
Poll::Pending
}
}
// Usage with timeout:
// match Select::new(http_get(url), TimerFuture::new(timeout)).await {
// Either::Left(response) => println!("Got response: {}", response),
// Either::Right(()) => println!("Request timed out!"),
// }
}
Fairness note: this hand-written
Selectalways pollsAfirst, so if both futures are ready at the same time,Aalways wins. Tokio’sselect!randomizes polling order to make this fairer.
公平性提示: 这个手写版Select总是先轮询A,所以如果两个 future 同时 ready,A永远获胜。Tokio 的select!会对轮询顺序做随机化,以减少这种偏置。
🏋️ Exercise: Build a RetryFuture
🏋️ 练习:实现一个 RetryFuture
Challenge: Build a RetryFuture<F, Fut> that accepts a closure F: Fn() -> Fut and retries up to N times if the inner future returns Err. It should yield the first Ok result, or the last Err if all attempts fail.
挑战:实现一个 RetryFuture<F, Fut>,它接收闭包 F: Fn() -> Fut,当内部 future 返回 Err 时最多重试 N 次。结果应该是第一个 Ok,或者在所有尝试都失败后返回最后一个 Err。
Hint: You’ll need states for “currently running an attempt” and “all attempts exhausted.”
提示:至少需要区分“当前正在执行某次尝试”和“所有尝试次数都已经耗尽”这两类状态。
🔑 Solution
🔑 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>> + Unpin,
{
factory: F,
current: Option<Fut>,
remaining: usize,
last_error: Option<E>,
}
impl<F, Fut, T, E> RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>> + Unpin,
{
pub fn new(max_attempts: usize, factory: F) -> Self {
let current = Some((factory)());
RetryFuture {
factory,
current,
remaining: max_attempts.saturating_sub(1),
last_error: None,
}
}
}
impl<F, Fut, T, E> Future for RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut + Unpin,
Fut: Future<Output = Result<T, E>> + Unpin,
T: Unpin,
E: Unpin,
{
type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(ref mut fut) = self.current {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(val)) => return Poll::Ready(Ok(val)),
Poll::Ready(Err(e)) => {
self.last_error = Some(e);
if self.remaining > 0 {
self.remaining -= 1;
self.current = Some((self.factory)());
// Loop to poll the new future immediately
} else {
return Poll::Ready(Err(self.last_error.take().unwrap()));
}
}
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Ready(Err(self.last_error.take().unwrap()));
}
}
}
}
// Usage:
// let result = RetryFuture::new(3, || async {
// http_get("https://flaky-server.com/api").await
// }).await;
}
Key takeaway: the retry future is itself another state machine. It remembers the current attempt, stores the last error, and recreates a new inner future whenever another attempt is needed.
核心收获: 重试 future 本身又是一个状态机。它要记住当前尝试、保存上一次错误,并在需要重试时重新创建新的内部 future。
Key Takeaways — Building Futures by Hand
本章要点——亲手构建 Future
- A future needs three essentials: state, a
poll()implementation, and waker registration
一个 future 至少需要三样东西:状态、poll()实现,以及 waker 注册逻辑Joinpolls both child futures;Selectreturns whichever finishes firstJoin会轮询两个子 future;Select则返回先完成的那个- Combinators are futures wrapping other futures — everything keeps nesting
组合子本质上也是 future,只不过它包着别的 future,一层套一层- Hand-writing futures helps build直觉,但生产代码里优先用
tokio::join!和select!
亲手写 future 很能建立直觉,但在生产代码里,优先使用tokio::join!和select!这类成熟工具
See also: Ch 2 — The Future Trait for the trait definition, Ch 8 — Tokio Deep Dive for production-grade equivalents.
继续阅读: 第 2 章——The Future Trait 会回到 trait 定义本身,第 8 章——Tokio Deep Dive 会给出生产级替代方案。
7. Executors and Runtimes 🟡
7. 执行器与运行时 🟡
What you’ll learn:
本章将学习:
- What an executor does: poll + sleep efficiently
执行器的职责:在合适时机轮询,并在空闲时高效休眠- The six major runtimes: mio, io_uring, tokio, async-std, smol, embassy
六类关键运行时与基础设施:mio、io_uring、tokio、async-std、smol、embassy- A decision tree for choosing the right runtime
如何根据场景选择合适运行时- Why runtime-agnostic library design matters
为什么库设计应尽量保持运行时无关
What an Executor Does
执行器到底做什么
An executor has two jobs:
执行器主要负责两件事:
- Poll futures when they’re ready to make progress
在 Future 可以继续推进时对其进行poll - Sleep efficiently when no futures are ready using OS I/O notification APIs
当暂时没有 Future 可推进时,借助操作系统的 I/O 通知机制高效休眠
graph TB
subgraph Executor["Executor<br/>执行器<br/>(e.g., tokio / 例如 tokio)"]
QUEUE["Task Queue<br/>任务队列"]
POLLER["I/O Poller<br/>I/O 轮询器<br/>(epoll/kqueue/io_uring)"]
THREADS["Worker Thread Pool<br/>工作线程池"]
end
subgraph Tasks["Tasks<br/>任务"]
T1["Task 1<br/>任务 1<br/>(HTTP request / HTTP 请求)"]
T2["Task 2<br/>任务 2<br/>(DB query / 数据库查询)"]
T3["Task 3<br/>任务 3<br/>(File read / 文件读取)"]
end
subgraph OS["Operating System<br/>操作系统"]
NET["Network Stack<br/>网络栈"]
DISK["Disk I/O<br/>磁盘 I/O"]
end
T1 --> QUEUE
T2 --> QUEUE
T3 --> QUEUE
QUEUE --> THREADS
THREADS -->|"poll()"| T1
THREADS -->|"poll()"| T2
THREADS -->|"poll()"| T3
POLLER <-->|"register / notify<br/>注册 / 通知"| NET
POLLER <-->|"register / notify<br/>注册 / 通知"| DISK
POLLER -->|"wake tasks<br/>唤醒任务"| QUEUE
style Executor fill:#e3f2fd,color:#000
style OS fill:#f3e5f5,color:#000
mio: The Foundation Layer
mio:底层基座
mio (Metal I/O) is not an executor. It is the lowest-level cross-platform I/O notification library. It wraps epoll on Linux, kqueue on macOS and BSD, and IOCP on Windows.
mio 意为 Metal I/O,它本身并不是执行器,而是跨平台 I/O 通知能力的底层抽象。它对 Linux 的 epoll、macOS 和 BSD 的 kqueue、以及 Windows 的 IOCP 做了统一封装。
#![allow(unused)]
fn main() {
// Conceptual mio usage (simplified):
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
let mut server = TcpListener::bind("0.0.0.0:8080")?;
poll.registry().register(&mut server, Token(0), Interest::READABLE)?;
// Event loop — blocks until something happens
loop {
poll.poll(&mut events, None)?; // Sleeps until I/O event
for event in events.iter() {
match event.token() {
Token(0) => { /* server has a new connection */ }
_ => { /* other I/O ready */ }
}
}
}
}
Most developers never touch mio directly. Tokio and smol sit on top of it.
多数开发者并不会直接操作 mio,tokio 和 smol 这类运行时已经把它包在更上层的抽象之下。
io_uring: The Completion-Based Future
io_uring:基于完成通知的未来方向
Linux io_uring requires kernel 5.1 or newer. It represents a fundamental shift from the readiness-based I/O model used by mio and epoll.
Linux 的 io_uring 需要 5.1 及以上内核,它代表了一种和 mio、epoll 所采用的“就绪通知模型”截然不同的思路。
Readiness-based (epoll / mio / tokio):
1. Ask: "Is this socket readable?" → epoll_wait()
2. Kernel: "Yes, it's ready" → EPOLLIN event
3. App: read(fd, buf) → might still block briefly!
Completion-based (io_uring):
1. Submit: "Read from this socket into this buffer" → SQE
2. Kernel: does the read asynchronously
3. App: gets completed result with data → CQE
基于就绪的模型(epoll / mio / tokio):1. 先问:“这个 socket 现在可读吗?” → `epoll_wait()`
2. 内核回答:“可读了。” → 收到 `EPOLLIN` 事件
3. 应用再调用 `read(fd, buf)` → 这一步仍可能出现短暂阻塞
基于完成的模型(io_uring):
1. 直接提交:“把这个 socket 读进这个缓冲区。” → SQE
2. 内核异步执行读取
3. 应用收到“已完成”的结果与数据 → CQE
graph LR
subgraph "Readiness Model<br/>就绪模型<br/>(epoll)"
A1["App: is it ready?<br/>应用:准备好了吗?"] --> K1["Kernel: yes<br/>内核:好了"]
K1 --> A2["App: now read()<br/>应用:现在读"]
A2 --> K2["Kernel: here's data<br/>内核:数据给你"]
end
subgraph "Completion Model<br/>完成模型<br/>(io_uring)"
B1["App: read this for me<br/>应用:替我把它读出来"] --> K3["Kernel: working...<br/>内核:正在处理"]
K3 --> B2["App: got result + data<br/>应用:收到结果和数据"]
end
style B1 fill:#c8e6c9,color:#000
style B2 fill:#c8e6c9,color:#000
The ownership challenge: io_uring needs the kernel to own the buffer until the operation completes. That clashes with Rust’s standard AsyncRead trait, which only borrows the buffer. This is why tokio-uring exposes different I/O traits.
所有权上的难点:io_uring 需要在操作完成前把缓冲区控制权交给内核,而 Rust 标准 AsyncRead trait 只借用缓冲区。这就是 tokio-uring 必须设计不同 I/O trait 的原因。
#![allow(unused)]
fn main() {
// Standard tokio (readiness-based) — borrows the buffer:
let n = stream.read(&mut buf).await?; // buf is borrowed
// tokio-uring (completion-based) — takes ownership of the buffer:
let (result, buf) = stream.read(buf).await; // buf is moved in, returned back
let n = result?;
}
// Cargo.toml: tokio-uring = "0.5"
// NOTE: Linux-only, requires kernel 5.1+
fn main() {
tokio_uring::start(async {
let file = tokio_uring::fs::File::open("data.bin").await.unwrap();
let buf = vec![0u8; 4096];
let (result, buf) = file.read_at(buf, 0).await;
let bytes_read = result.unwrap();
println!("Read {} bytes: {:?}", bytes_read, &buf[..bytes_read]);
});
}
| Aspect 维度 | epoll (tokio) epoll(tokio) | io_uring (tokio-uring) io_uring(tokio-uring) |
|---|---|---|
| Model 模型 | Readiness notification 就绪通知 | Completion notification 完成通知 |
| Syscalls 系统调用 | epoll_wait + read/write | Batched SQE/CQE ring 批量 SQE/CQE 环 |
| Buffer ownership 缓冲区所有权 | App retains (&mut buf)应用保留所有权 | Ownership transfer (move buf)所有权转移给内核 |
| Platform 平台 | Linux, macOS, Windows | Linux 5.1+ only 仅 Linux 5.1+ |
| Zero-copy 零拷贝 | No 否 | Yes 是 |
| Maturity 成熟度 | Production-ready 生产可用 | Experimental 实验性 |
When to use io_uring: Use it when high-throughput networking or file I/O is bottlenecked by syscall overhead, such as databases, storage engines, or proxies serving 100k+ connections. For most applications, standard tokio is still the correct default.
什么时候该用 io_uring:当网络或文件 I/O 的系统调用开销已经成为主要瓶颈,例如数据库、存储引擎、或者需要支撑十万级连接的代理服务时,再认真考虑它。对绝大多数应用而言,标准 tokio 依旧是更合适的默认方案。
tokio: The Batteries-Included Runtime
tokio:配套最完整的运行时
Tokio is the dominant async runtime in the Rust ecosystem. Axum, Hyper, Tonic, and most production Rust servers build on top of it.
Tokio 是 Rust 生态里最主流的异步运行时。Axum、Hyper、Tonic,以及大多数生产级 Rust 服务都建立在它之上。
// Cargo.toml:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
#[tokio::main]
async fn main() {
// Spawns a multi-threaded runtime with work-stealing scheduler
let handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
"done"
});
let result = handle.await.unwrap();
println!("{result}");
}
tokio features: timers, I/O, TCP and UDP, Unix sockets, signal handling, synchronization primitives, filesystem access, process management, and tracing integration.
tokio 自带能力:定时器、I/O、TCP/UDP、Unix Socket、信号处理、同步原语、文件系统、进程管理,以及和 tracing 的集成。
async-std: The Standard Library Mirror
async-std:贴近标准库风格
async-std offers async APIs that mirror std. It is less popular than tokio, but many newcomers feel it is easier to approach.async-std 试图提供一套与 std 形态相近的异步 API。它的生态热度低于 tokio,但对于初学者来说通常更直观一些。
// Cargo.toml:
// [dependencies]
// async-std = { version = "1", features = ["attributes"] }
#[async_std::main]
async fn main() {
use async_std::fs;
let content = fs::read_to_string("hello.txt").await.unwrap();
println!("{content}");
}
smol: The Minimalist Runtime
smol:极简派运行时
Smol is a compact, low-dependency async runtime. It is useful for libraries that want async support without pulling in the full tokio stack.
Smol 是一个体量小、依赖少的异步运行时。对于想提供异步能力、又不愿意把整套 tokio 依赖拖进来的库来说,它很合适。
// Cargo.toml:
// [dependencies]
// smol = "2"
fn main() {
smol::block_on(async {
let result = smol::unblock(|| {
// Runs blocking code on a thread pool
std::fs::read_to_string("hello.txt")
}).await.unwrap();
println!("{result}");
});
}
embassy: Async for Embedded (no_std)
embassy:面向嵌入式的异步方案
Embassy targets embedded systems. It avoids heap allocation, works without std, and fits microcontrollers well.
Embassy 面向嵌入式系统,通常无需堆分配,也不依赖 std,非常适合微控制器环境。
// Runs on microcontrollers (e.g., STM32, nRF52, RP2040)
#[embassy_executor::main]
async fn main(spawner: embassy_executor::Spawner) {
// Blink an LED with async/await — no RTOS needed!
let mut led = Output::new(p.PA5, Level::Low, Speed::Low);
loop {
led.set_high();
Timer::after(Duration::from_millis(500)).await;
led.set_low();
Timer::after(Duration::from_millis(500)).await;
}
}
Runtime Decision Tree
运行时选择树
graph TD
START["Choosing a Runtime<br/>选择运行时"]
Q1{"Building a<br/>network server?<br/>是否在构建网络服务?"}
Q2{"Need tokio ecosystem<br/>(Axum, Tonic, Hyper)?<br/>是否依赖 tokio 生态?"}
Q3{"Building a library?<br/>是否在写库?"}
Q4{"Embedded / no_std?<br/>是否为嵌入式或 no_std?"}
Q5{"Want minimal<br/>dependencies?<br/>是否偏好最少依赖?"}
TOKIO["🟢 tokio<br/>Best ecosystem, most popular<br/>生态最完整,使用最广"]
SMOL["🔵 smol<br/>Minimal, no ecosystem lock-in<br/>轻量,生态绑定少"]
EMBASSY["🟠 embassy<br/>Embedded-first, no alloc<br/>嵌入式优先,可免分配"]
ASYNC_STD["🟣 async-std<br/>std-like API, good for learning<br/>接口像标准库,适合入门"]
AGNOSTIC["🔵 runtime-agnostic<br/>Use futures crate only<br/>保持运行时无关,仅依赖 futures"]
START --> Q1
Q1 -->|Yes| Q2
Q1 -->|No| Q3
Q2 -->|Yes| TOKIO
Q2 -->|No| Q5
Q3 -->|Yes| AGNOSTIC
Q3 -->|No| Q4
Q4 -->|Yes| EMBASSY
Q4 -->|No| Q5
Q5 -->|Yes| SMOL
Q5 -->|No| ASYNC_STD
style TOKIO fill:#c8e6c9,color:#000
style SMOL fill:#bbdefb,color:#000
style EMBASSY fill:#ffe0b2,color:#000
style ASYNC_STD fill:#e1bee7,color:#000
style AGNOSTIC fill:#bbdefb,color:#000
Runtime Comparison Table
运行时对比表
| Feature 特性 | tokio | async-std | smol | embassy |
|---|---|---|---|---|
| Ecosystem 生态 | Dominant 主流 | Small 较小 | Minimal 精简 | Embedded 嵌入式 |
| Multi-threaded 多线程 | ✅ Work-stealing 支持工作窃取 | ✅ | ✅ | ❌ Single-core 单核场景为主 |
| no_std 支持 no_std | ❌ | ❌ | ❌ | ✅ |
| Timer 定时器 | ✅ Built-in 内建 | ✅ Built-in 内建 | Via async-io依赖 async-io | ✅ HAL-based 基于 HAL |
| I/O I/O | ✅ Own abstractions 自有抽象 | ✅ std mirror 贴近 std | ✅ Via async-io经由 async-io | ✅ HAL drivers HAL 驱动 |
| Channels 通道 | ✅ Rich set 种类丰富 | ✅ | Via async-channel依赖 async-channel | ✅ |
| Learning curve 学习成本 | Medium 中等 | Low 较低 | Low 较低 | High 较高,需要硬件背景 |
| Binary size 二进制体积 | Large 较大 | Medium 中等 | Small 较小 | Tiny 很小 |
🏋️ Exercise: Runtime Comparison
🏋️ 练习:运行时对比
Challenge: Write the same program using three different runtimes: tokio, smol, and async-std. The program should fetch a URL, read a file, and print both results. Here both operations can be simulated with sleeps.
挑战:分别用 tokio、smol 和 async-std 写出同一个程序。程序需要获取一个 URL、读取一个文件,然后打印两个结果。这里可以用休眠来模拟这两类操作。
This exercise shows that most async business logic stays the same. What changes is the runtime bootstrap and the timer or I/O API surface.
这个练习要强调的是:异步业务逻辑大多相同,变化主要集中在运行时入口以及计时器、I/O 相关 API 的样式上。
🔑 Solution
🔑 参考答案
// ----- tokio version -----
// Cargo.toml: tokio = { version = "1", features = ["full"] }
#[tokio::main]
async fn main() {
let (url_result, file_result) = tokio::join!(
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"Response from URL"
},
async {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
"Contents of file"
},
);
println!("URL: {url_result}, File: {file_result}");
}
// ----- smol version -----
// Cargo.toml: smol = "2", futures-lite = "2"
fn main() {
smol::block_on(async {
let (url_result, file_result) = futures_lite::future::zip(
async {
smol::Timer::after(std::time::Duration::from_millis(100)).await;
"Response from URL"
},
async {
smol::Timer::after(std::time::Duration::from_millis(50)).await;
"Contents of file"
},
).await;
println!("URL: {url_result}, File: {file_result}");
});
}
// ----- async-std version -----
// Cargo.toml: async-std = { version = "1", features = ["attributes"] }
#[async_std::main]
async fn main() {
let (url_result, file_result) = futures::future::join(
async {
async_std::task::sleep(std::time::Duration::from_millis(100)).await;
"Response from URL"
},
async {
async_std::task::sleep(std::time::Duration::from_millis(50)).await;
"Contents of file"
},
).await;
println!("URL: {url_result}, File: {file_result}");
}
Key takeaway: The business logic remains identical across runtimes. Entry points and helper APIs are the main differences. That is why runtime-agnostic libraries built on std::future::Future are so valuable.
核心收获:不同运行时之间,业务逻辑几乎不变,主要变化点在入口和辅助 API 上。这也说明,建立在 std::future::Future 之上的运行时无关库会更有长期价值。
Key Takeaways — Executors and Runtimes
本章要点——执行器与运行时
- An executor polls futures when woken and sleeps efficiently using OS I/O APIs
执行器会在 Future 被唤醒时轮询它,并在空闲时依靠操作系统 I/O 机制高效休眠- tokio is the default choice for servers, smol fits minimal footprints, and embassy is for embedded systems
tokio 适合作为服务端默认方案,smol 适合追求轻量依赖,embassy 面向嵌入式场景- Business logic should depend on
std::future::Future, not on a specific runtime type
业务逻辑应依赖std::future::Future,而不是把自己绑死在某个运行时类型上io_uringmay become a major direction for high-performance I/O, but the ecosystem is still maturingio_uring可能会成为高性能 I/O 的重要方向,但整个生态仍在持续完善
See also: Ch 8 — Tokio Deep Dive for tokio specifics, Ch 9 — When Tokio Isn’t the Right Fit for alternatives.
延伸阅读: 第 8 章——Tokio 深入解析 关注 tokio 细节,第 9 章——什么时候 Tokio 不是最佳选择 讨论替代方案。
8. Tokio Deep Dive 🟡
8. Tokio 深入剖析 🟡
What you’ll learn:
本章将学到什么:
- Runtime flavors: multi-thread vs current-thread and when to use each
运行时的两种风格:多线程和当前线程,以及它们各自适合什么场景tokio::spawn, the'staticrequirement, andJoinHandletokio::spawn、'static约束,以及JoinHandle的行为- Task cancellation semantics
任务取消的语义- Sync primitives:
Mutex、RwLock、Semaphoreand four channel types
同步原语:Mutex、RwLock、Semaphore,以及四种 channel
Runtime Flavors: Multi-Thread vs Current-Thread
运行时风格:多线程与当前线程
Tokio provides two major runtime configurations:
Tokio 主要提供两种运行时配置:
// Multi-threaded (default with #[tokio::main])
// Uses a work-stealing thread pool
#[tokio::main]
async fn main() {
// N worker threads (default = CPU core count)
// Tasks must be Send + 'static
}
// Current-thread — everything stays on one thread
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Single-threaded
// Tasks do not need to be Send
// Good for small tools or WASM
}
// Manual runtime construction
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("Running on custom runtime");
});
graph TB
subgraph "Multi-Thread<br/>多线程(默认)"
MT_Q1["Thread 1<br/>Task A, Task D<br/>线程 1"]
MT_Q2["Thread 2<br/>Task B<br/>线程 2"]
MT_Q3["Thread 3<br/>Task C, Task E<br/>线程 3"]
STEAL["Work Stealing<br/>空闲线程会从繁忙线程偷任务"]
MT_Q1 <--> STEAL
MT_Q2 <--> STEAL
MT_Q3 <--> STEAL
end
subgraph "Current-Thread<br/>当前线程"
ST_Q["Single Thread<br/>Task A -> Task B -> Task C -> Task D<br/>单线程顺序调度"]
end
style MT_Q1 fill:#c8e6c9,color:#000
style MT_Q2 fill:#c8e6c9,color:#000
style MT_Q3 fill:#c8e6c9,color:#000
style ST_Q fill:#bbdefb,color:#000
Multi-thread runtimes are the default choice for servers and background systems with lots of independent work. Current-thread runtimes are lighter, easier to reason about, and especially useful when tasks are !Send, or when the whole program is intentionally single-threaded.
多线程运行时通常是服务端和后台系统的默认选择,适合并发任务很多的场景。当前线程运行时更轻、更容易推理,特别适合 !Send 任务,或者本来就打算完全单线程执行的程序。
tokio::spawn and the 'static Requirement
tokio::spawn 与 'static 约束
tokio::spawn places a future into Tokio’s task queue. Because that task might run on any worker thread and might outlive the scope that created it, the future must be Send + 'static.tokio::spawn 会把一个 future 扔进 Tokio 的任务队列。由于这个任务可能在任意工作线程上运行,也可能活得比创建它的作用域还久,所以这个 future 必须满足 Send + 'static。
#![allow(unused)]
fn main() {
use tokio::task;
async fn example() {
let data = String::from("hello");
// Works: ownership is moved into the task
let handle = task::spawn(async move {
println!("{data}");
data.len()
});
let len = handle.await.unwrap();
println!("Length: {len}");
}
async fn problem() {
let data = String::from("hello");
// Fails: borrows local data, so not 'static
// task::spawn(async {
// println!("{data}");
// });
// Fails: Rc is !Send
// let rc = std::rc::Rc::new(42);
// task::spawn(async move {
// println!("{rc}");
// });
}
}
Why 'static? Because the task may outlive the caller’s stack frame, borrowed references are not acceptable unless the compiler can prove they live forever.
为什么要 'static? 因为任务可能活得比调用方栈帧还久,所以除非编译器能证明某个引用能一直有效,否则就不能把借用数据塞进去。
Why Send? Because on a multi-thread runtime, a task may be resumed on a different worker thread after an .await, so anything carried across suspension points must be thread-transfer safe.
为什么要 Send? 因为在多线程运行时里,一个任务在 .await 之后完全可能换到另一条线程继续执行,所以所有跨挂起点保存下来的数据都必须能安全跨线程移动。
#![allow(unused)]
fn main() {
// Common pattern: clone shared ownership into each task
let shared = Arc::new(config);
for i in 0..10 {
let shared = Arc::clone(&shared);
tokio::spawn(async move {
process_item(i, &shared).await;
});
}
}
JoinHandle and Task Cancellation
JoinHandle 与任务取消
#![allow(unused)]
fn main() {
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
async fn cancellation_example() {
let handle: JoinHandle<String> = tokio::spawn(async {
sleep(Duration::from_secs(10)).await;
"completed".to_string()
});
// Dropping JoinHandle does NOT cancel the task
// drop(handle);
// Explicit cancellation
handle.abort();
match handle.await {
Ok(val) => println!("Got: {val}"),
Err(e) if e.is_cancelled() => println!("Task was cancelled"),
Err(e) => println!("Task panicked: {e}"),
}
}
}
Important: Dropping a
JoinHandlein Tokio does not cancel the task. It only detaches it. To cancel the task, call.abort()explicitly. That is very different from dropping a plainFuture, which does cancel the computation by dropping it.
重要: 在 Tokio 里,丢掉JoinHandle并不会取消任务,只会让任务脱离追踪继续后台运行。真想取消,就得显式调用.abort()。这一点和直接丢弃普通Future很不一样,后者会随着被 drop 一起结束计算。
Tokio Sync Primitives
Tokio 的同步原语
Tokio provides async-aware synchronization primitives. The most important rule is simple: do not hold std::sync::Mutex across .await points.
Tokio 提供了一套“知道自己活在异步环境里”的同步原语。最关键的一条规矩很简单:不要把 std::sync::Mutex 的锁跨着 .await 持有。
#![allow(unused)]
fn main() {
use tokio::sync::{Mutex, RwLock, Semaphore, mpsc, oneshot, broadcast, watch};
// --- Mutex ---
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
{
let mut guard = data.lock().await;
guard.push(4);
}
// --- Channels ---
// mpsc: multiple producers, single consumer
let (tx, mut rx) = mpsc::channel::<String>(100);
tokio::spawn(async move {
tx.send("hello".into()).await.unwrap();
});
let msg = rx.recv().await.unwrap();
// oneshot: one value, one receiver
let (tx, rx) = oneshot::channel::<i32>();
tx.send(42).unwrap();
let val = rx.await.unwrap();
// broadcast: every subscriber receives every message
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
// watch: only the latest value matters
let (tx, rx) = watch::channel(0u64);
tx.send(42).unwrap();
println!("Latest: {}", *rx.borrow());
}
Note:
.unwrap()is used for brevity throughout these channel examples. In production, handle send/receive errors gracefully — a failed.send()means the receiver was dropped, and a failed.recv()means the channel is closed.
说明: 这些 channel 示例里用了.unwrap(),只是为了把重点放在机制本身。生产代码里要认真处理收发错误:.send()失败通常表示接收端已经被丢弃,.recv()失败则表示 channel 已经关闭。
graph LR
subgraph "Channel Types<br/>Channel 类型"
direction TB
MPSC["mpsc<br/>N -> 1<br/>带缓冲队列"]
ONESHOT["oneshot<br/>1 -> 1<br/>单次值传递"]
BROADCAST["broadcast<br/>N -> N<br/>所有订阅者都收到"]
WATCH["watch<br/>1 -> N<br/>只保留最新值"]
end
P1["Producer 1<br/>生产者 1"] --> MPSC
P2["Producer 2<br/>生产者 2"] --> MPSC
MPSC --> C1["Consumer<br/>消费者"]
P3["Producer<br/>生产者"] --> ONESHOT
ONESHOT --> C2["Consumer<br/>消费者"]
P4["Producer<br/>生产者"] --> BROADCAST
BROADCAST --> C3["Consumer 1<br/>消费者 1"]
BROADCAST --> C4["Consumer 2<br/>消费者 2"]
P5["Producer<br/>生产者"] --> WATCH
WATCH --> C5["Consumer 1<br/>消费者 1"]
WATCH --> C6["Consumer 2<br/>消费者 2"]
Case Study: Choosing the Right Channel for a Notification Service
案例:通知服务里该怎么挑 channel
Suppose a notification service has the following needs:
假设有一个通知服务,需要满足下面这些条件:
- Multiple API handlers produce events.
多个 API handler 会产生日志或事件。 - A single background task batches and sends them.
有一个单独的后台任务负责批量发送。 - A config watcher updates rate limits at runtime.
配置监听器会在运行时更新限流参数。 - A shutdown signal must reach every component.
关停信号必须送达到所有组件。
Which channels match each requirement?
每个需求分别该用哪种 channel?
| Requirement 需求 | Channel | Why 原因 |
|---|---|---|
| API handlers -> Batcher | mpsc (bounded) | Many producers, one consumer. Bounded capacity creates backpressure instead of silently OOMing. 多生产者、单消费者。容量设成有界,批处理跟不上时会自然产生背压,而不是悄悄把内存吃爆。 |
| Config watcher -> Rate limiter | watch | Only the latest config matters, and many readers may want that latest value. 这里只关心最新配置,而且多个读者都可能需要它。 |
| Shutdown signal -> All components | broadcast | Every component must independently receive the shutdown notification. 每个组件都必须独立收到同一份关停通知。 |
| Single health-check response | oneshot | Standard one-request one-response shape. 典型的一问一答模型。 |
graph LR
subgraph "Notification Service<br/>通知服务"
direction TB
API1["API Handler 1"] -->|mpsc| BATCH["Batcher<br/>批处理器"]
API2["API Handler 2"] -->|mpsc| BATCH
CONFIG["Config Watcher<br/>配置监听器"] -->|watch| RATE["Rate Limiter<br/>限流器"]
CTRL["Ctrl+C"] -->|broadcast| API1
CTRL -->|broadcast| BATCH
CTRL -->|broadcast| RATE
end
style API1 fill:#d4efdf,stroke:#27ae60,color:#000
style API2 fill:#d4efdf,stroke:#27ae60,color:#000
style BATCH fill:#e8f4f8,stroke:#2980b9,color:#000
style CONFIG fill:#fef9e7,stroke:#f39c12,color:#000
style RATE fill:#fef9e7,stroke:#f39c12,color:#000
style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000
🏋️ Exercise: Build a Task Pool 🏋️ 练习:实现一个任务池
Challenge: Implement a function run_with_limit that accepts a list of async closures plus a concurrency limit, and ensures at most N tasks run simultaneously. Use tokio::sync::Semaphore.
挑战题: 实现一个 run_with_limit 函数,它接收一组异步闭包和一个并发上限,保证同一时间最多只有 N 个任务在跑。要求使用 tokio::sync::Semaphore。
🔑 Solution 🔑 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Semaphore;
async fn run_with_limit<F, Fut, T>(tasks: Vec<F>, limit: usize) -> Vec<T>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let semaphore = Arc::new(Semaphore::new(limit));
let mut handles = Vec::new();
for task in tasks {
let permit = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
task().await
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
}
Key takeaway: Semaphore is Tokio’s standard tool for concurrency limiting. Each task acquires a permit before starting work; when permits run out, later tasks wait asynchronously instead of blocking a worker thread.
要点: Semaphore 就是 Tokio 里限制并发度的标准工具。每个任务开工前先拿一个 permit,permit 用光之后,后面的任务会异步等待,而不是把工作线程堵死。
Key Takeaways — Tokio Deep Dive
本章要点:Tokio 深入剖析
- Use
multi_threadfor servers and heavier concurrent workloads; usecurrent_threadfor CLIs, tests, or!Sendtasks.
服务器和重并发任务优先选multi_thread;CLI、小工具、测试或者!Send任务更适合current_thread。tokio::spawnrequires'staticfutures;Arcand channels are the common ways to share state safely.tokio::spawn需要'staticfuture,常见解法是用Arc或 channel 共享状态。- Dropping a
JoinHandledoes not cancel the task; use.abort()when cancellation is intentional.
丢掉JoinHandle不会取消任务,真要取消就显式调用.abort()。- Pick sync primitives by problem shape:
Mutexfor shared mutable state,Semaphorefor concurrency caps, and channels for communication flows.
同步原语要按问题形状选:共享可变状态用Mutex,并发限流用Semaphore,任务之间传递消息就上 channel。
See also: Ch 9 — When Tokio Isn’t the Right Fit and Ch 12 — Common Pitfalls.
继续阅读: 第 9 章:什么时候 Tokio 并不合适 和 第 12 章:常见陷阱。
9. When Tokio Isn’t the Right Fit 🟡
9. Tokio 不一定合适的场景 🟡
What you’ll learn:
本章将学到什么:
- The
'staticproblem: whentokio::spawnforces you intoArceverywhere'static问题:什么时候tokio::spawn会逼得代码里到处都是ArcLocalSetfor!Sendfutures
如何用LocalSet承载!SendfutureFuturesUnorderedfor borrow-friendly concurrency (no spawn needed)
如何用FuturesUnordered实现更适合借用的并发,而且不需要 spawnJoinSetfor managed task groups
如何用JoinSet管理成组任务- Writing runtime-agnostic libraries
怎样写对运行时无绑定的库
graph TD
START["Need concurrent futures?"] --> STATIC{"Can futures be 'static?"}
STATIC -->|Yes| SEND{"Are futures Send?"}
STATIC -->|No| FU["FuturesUnordered<br/>Runs on current task"]
SEND -->|Yes| SPAWN["tokio::spawn<br/>Multi-threaded"]
SEND -->|No| LOCAL["LocalSet<br/>Single-threaded"]
SPAWN --> MANAGE{"Need to track/abort tasks?"}
MANAGE -->|Yes| JOINSET["JoinSet / TaskTracker"]
MANAGE -->|No| HANDLE["JoinHandle"]
style START fill:#f5f5f5,stroke:#333,color:#000
style FU fill:#d4efdf,stroke:#27ae60,color:#000
style SPAWN fill:#e8f4f8,stroke:#2980b9,color:#000
style LOCAL fill:#fef9e7,stroke:#f39c12,color:#000
style JOINSET fill:#e8daef,stroke:#8e44ad,color:#000
style HANDLE fill:#e8f4f8,stroke:#2980b9,color:#000
The ’static Future Problem
'static Future 问题
Tokio’s spawn requires 'static futures. This means you can’t borrow local data in spawned tasks:
Tokio 的 spawn 要求 future 满足 'static。这就意味着,被 spawn 出去的任务里没法直接借用局部数据:
#![allow(unused)]
fn main() {
async fn process_items(items: &[String]) {
// ❌ Can't do this — items is borrowed, not 'static
// for item in items {
// tokio::spawn(async {
// process(item).await;
// });
// }
// 😐 Workaround 1: Clone everything
for item in items {
let item = item.clone();
tokio::spawn(async move {
process(&item).await;
});
}
// 😐 Workaround 2: Use Arc
let items = Arc::new(items.to_vec());
for i in 0..items.len() {
let items = Arc::clone(&items);
tokio::spawn(async move {
process(&items[i]).await;
});
}
}
}
This is annoying! In Go, you can just go func() { use(item) } with a closure. In Rust, the ownership system forces you to think about who owns what and how long it lives.
这就很烦。在 Go 里,闭包一包,go func() { use(item) } 就完了。到了 Rust,所有权系统会逼着把“谁拥有数据、它能活多久”想得明明白白。
Scoped Tasks and Alternatives
作用域任务与替代方案
Several solutions exist for the 'static problem:
针对 'static 这件事,其实有好几条路可走:
#![allow(unused)]
fn main() {
// 1. tokio::task::LocalSet — run !Send futures on current thread
use tokio::task::LocalSet;
let local_set = LocalSet::new();
local_set.run_until(async {
tokio::task::spawn_local(async {
// Can use Rc, Cell, and other !Send types here
let rc = std::rc::Rc::new(42);
println!("{rc}");
}).await.unwrap();
}).await;
// 2. FuturesUnordered — concurrent without spawning
use futures::stream::{FuturesUnordered, StreamExt};
async fn process_items(items: &[String]) {
let futures: FuturesUnordered<_> = items
.iter()
.map(|item| async move {
// ✅ Can borrow item — no spawn, no 'static needed!
process(item).await
})
.collect();
// Drive all futures to completion
futures.for_each(|result| async {
println!("Result: {result:?}");
}).await;
}
// 3. tokio JoinSet (tokio 1.21+) — managed set of spawned tasks
use tokio::task::JoinSet;
async fn with_joinset() {
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
i * 2
});
}
while let Some(result) = set.join_next().await {
println!("Task completed: {:?}", result.unwrap());
}
}
}
Lightweight Runtimes for Libraries
面向库的轻依赖运行时策略
If you’re writing a library — don’t force users into tokio:
如果写的是库,尽量别强行把使用者绑死在 tokio 上:
#![allow(unused)]
fn main() {
// ❌ BAD: Library forces tokio on users
pub async fn my_lib_function() {
tokio::time::sleep(Duration::from_secs(1)).await;
// Now your users MUST use tokio
}
// ✅ GOOD: Library is runtime-agnostic
pub async fn my_lib_function() {
// Use only types from std::future and futures crate
do_computation().await;
}
// ✅ GOOD: Accept a generic future for I/O operations
pub async fn fetch_with_retry<F, Fut, T, E>(
operation: F,
max_retries: usize,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
for attempt in 0..max_retries {
match operation().await {
Ok(val) => return Ok(val),
Err(e) if attempt == max_retries - 1 => return Err(e),
Err(_) => continue,
}
}
unreachable!()
}
}
Rule of thumb: Libraries should depend on
futurescrate, nottokio. Applications should depend ontokio(or their chosen runtime). This keeps the ecosystem composable.
经验法则:库更适合依赖futurescrate,而不是tokio。应用程序则直接依赖tokio,或者自己选定的运行时。这样整个生态才更容易组合和复用。
🏋️ Exercise: FuturesUnordered vs Spawn 🏋️ 练习:`FuturesUnordered` 和 `spawn` 的区别
Challenge: Write the same function two ways — once using tokio::spawn (requires 'static) and once using FuturesUnordered (borrows data). The function receives &[String] and returns the length of each string after a simulated async lookup.
挑战题:用两种方式实现同一个函数,一种用 tokio::spawn,因此需要 'static;另一种用 FuturesUnordered,因此可以借用数据。这个函数接收 &[String],对每个字符串做一次模拟异步查询,然后返回对应长度。
Compare: Which approach requires .clone()? Which can borrow the input slice?
再比较一下:哪种方式必须 .clone()?哪种方式可以直接借用输入切片?
🔑 Solution 🔑 参考答案
#![allow(unused)]
fn main() {
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, Duration};
// Version 1: tokio::spawn — requires 'static, must clone
async fn lengths_with_spawn(items: &[String]) -> Vec<usize> {
let mut handles = Vec::new();
for item in items {
let owned = item.clone(); // Must clone — spawn requires 'static
handles.push(tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
owned.len()
}));
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
// Version 2: FuturesUnordered — borrows data, no clone needed
async fn lengths_without_spawn(items: &[String]) -> Vec<usize> {
let futures: FuturesUnordered<_> = items
.iter()
.map(|item| async move {
sleep(Duration::from_millis(10)).await;
item.len() // ✅ Borrows item — no clone!
})
.collect();
futures.collect().await
}
#[tokio::test]
async fn test_both_versions() {
let items = vec!["hello".into(), "world".into(), "rust".into()];
let v1 = lengths_with_spawn(&items).await;
// Note: v1 preserves insertion order (sequential join)
let mut v2 = lengths_without_spawn(&items).await;
v2.sort(); // FuturesUnordered returns in completion order
assert_eq!(v1, vec![5, 5, 4]);
assert_eq!(v2, vec![4, 5, 5]);
}
}
Key takeaway: FuturesUnordered avoids the 'static requirement by running all futures on the current task (no thread migration). The trade-off: all futures share one task — if one blocks, the others stall. Use spawn for CPU-heavy work that should run on separate threads.
关键结论:FuturesUnordered 通过把所有 future 都跑在当前任务上,绕开了 'static 要求,也就不需要线程迁移。代价是所有 future 共用一个任务,里头只要有一个阻塞,其它的都得跟着等。真正需要拆到独立线程上的重 CPU 工作,还是得上 spawn。
Key Takeaways — When Tokio Isn’t the Right Fit
本章要点:什么时候 Tokio 不是最合适的选择
FuturesUnorderedruns futures concurrently on the current task — no'staticrequirementFuturesUnordered会在当前任务上并发推进多个 future,因此没有'static限制。LocalSetenables!Sendfutures on a single-threaded executorLocalSet让!Sendfuture 能在单线程执行器里安全运行。JoinSet(tokio 1.21+) provides managed task groups with automatic cleanupJoinSet从 tokio 1.21 开始提供了可管理的任务组,并带自动清理能力。- For libraries: depend only on
std::future::Future+futurescrate, not tokio directly
写库时最好只依赖std::future::Future和futurescrate,别把 tokio 当成硬依赖塞进去。
See also: Ch 8 — Tokio Deep Dive for when spawn is the right tool, Ch 11 — Streams for
buffer_unordered()as another concurrency limiter
继续阅读: 第 8 章:Tokio 深入解析 会说明什么时候 spawn 才是正解;第 11 章:Stream 还会介绍另一种限流并发手段buffer_unordered()。
10. Async Traits 🟡
10. 异步 Trait 🟡
What you’ll learn:
本章将学到什么:
- Why async methods in traits took years to stabilize
为什么 trait 里的异步方法拖了很多年才稳定- RPITIT: native async trait methods (Rust 1.75+)
RPITIT:原生 async trait 方法(Rust 1.75+)- The
dyndispatch challenge and thetrait_variantworkarounddyn分发为什么麻烦,以及trait_variant怎么补位- Async closures (Rust 1.85+):
async Fn()andasync FnOnce()
异步闭包(Rust 1.85+):async Fn()与async FnOnce()
graph TD
subgraph "Async Trait Approaches<br/>异步 Trait 的几种方案"
direction TB
RPITIT["RPITIT (Rust 1.75+)<br/>async fn in trait<br/>trait 中直接写 async fn<br/>Static dispatch only<br/>仅支持静态分发"]
VARIANT["trait_variant<br/>Auto-generates Send variant<br/>自动生成 Send 变体<br/>Enables dyn dispatch<br/>可用于 dyn 分发"]
BOXED["Box<dyn Future><br/>Manual boxing<br/>手动装箱<br/>Works everywhere<br/>到处都能用"]
CLOSURE["Async Closures (1.85+)<br/>异步闭包(1.85+)<br/>async Fn() / async FnOnce()<br/>Callbacks & middleware<br/>回调与中间件"]
end
RPITIT -->|"Need dyn?<br/>需要 dyn?"| VARIANT
RPITIT -->|"Pre-1.75?<br/>早于 1.75?"| BOXED
CLOSURE -->|"Replaces<br/>替代"| BOXED
style RPITIT fill:#d4efdf,stroke:#27ae60,color:#000
style VARIANT fill:#e8f4f8,stroke:#2980b9,color:#000
style BOXED fill:#fef9e7,stroke:#f39c12,color:#000
style CLOSURE fill:#e8daef,stroke:#8e44ad,color:#000
The History: Why It Took So Long
历史背景:为什么它拖了这么久
Async methods in traits were one of Rust’s most requested features for years. The original problem looked like this:
trait 里的异步方法,多年来一直是 Rust 社区呼声最高的能力之一。它迟迟上不了岸,核心难点其实就在下面这段东西背后。
#![allow(unused)]
fn main() {
// This didn't compile until Rust 1.75 (Dec 2023):
trait DataStore {
async fn get(&self, key: &str) -> Option<String>;
}
// Why? Because async fn returns `impl Future<Output = T>`,
// and `impl Trait` in trait return position wasn't supported.
}
The fundamental challenge is that when a trait method returns impl Future, every implementor produces a different concrete type. The compiler needs to know the size of the return type, but trait methods are also expected to work with dynamic dispatch. That combination is what made the feature so stubborn.
根上的麻烦在于:trait 方法一旦返回 impl Future,每个实现者吐出来的其实都是不同的具体类型。编译器又得知道返回值到底多大,trait 方法还偏偏常常要支持动态分发。几件事搅在一起,就把这个功能拖成了硬骨头。
RPITIT: Return Position Impl Trait in Trait
RPITIT:trait 返回位置上的 impl Trait
Since Rust 1.75, native async trait methods work for static dispatch:
从 Rust 1.75 开始,原生 async trait 在静态分发场景里终于能用了:
#![allow(unused)]
fn main() {
trait DataStore {
async fn get(&self, key: &str) -> Option<String>;
// Desugars to:
// fn get(&self, key: &str) -> impl Future<Output = Option<String>>;
}
struct InMemoryStore {
data: std::collections::HashMap<String, String>,
}
impl DataStore for InMemoryStore {
async fn get(&self, key: &str) -> Option<String> {
self.data.get(key).cloned()
}
}
// Works with generics (static dispatch)
async fn lookup<S: DataStore>(store: &S, key: &str) {
if let Some(val) = store.get(key).await {
println!("{key} = {val}");
}
}
}
dyn Dispatch and Send Bounds
dyn 分发与 Send 约束
The limitation is still real: native async trait methods do not make the trait object-safe for dyn use. The compiler still cannot name the concrete future type returned by the method.
但局限也很实在:原生 async trait 目前还没有神到让 trait 立刻对 dyn 友好。编译器依然没法给这个返回的具体 future 类型起一个固定名字。
#![allow(unused)]
fn main() {
// Doesn't work:
// async fn lookup_dyn(store: &dyn DataStore, key: &str) { ... }
// Error: the trait `DataStore` is not dyn-compatible because method `get`
// is `async`
// Workaround: return a boxed future
trait DynDataStore {
fn get(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send + '_>>;
}
// Or use the trait_variant macro (see below)
}
The Send problem: spawned tasks on multi-threaded runtimes must themselves be Send. Native async trait methods do not automatically add that bound to the returned future.Send 这个坑 也绕不过去:多线程运行时里被 spawn 出去的任务必须是 Send。而原生 async trait 方法并不会自动替返回 future 补上 Send 约束。
#![allow(unused)]
fn main() {
trait Worker {
async fn run(&self); // Future might or might not be Send
}
struct MyWorker;
impl Worker for MyWorker {
async fn run(&self) {
// If this uses !Send types, the future is !Send
let rc = std::rc::Rc::new(42);
some_work().await;
println!("{rc}");
}
}
// This fails if the future isn't Send:
// tokio::spawn(worker.run()); // Requires Send + 'static
}
The trait_variant Crate
trait_variant 这个 crate
The trait_variant crate, maintained by the Rust async working group, can generate a Send variant automatically:
Rust async 工作组维护的 trait_variant 可以自动生成一个带 Send 约束的变体,专门收拾这摊事:
#![allow(unused)]
fn main() {
// Cargo.toml: trait-variant = "0.1"
#[trait_variant::make(SendDataStore: Send)]
trait DataStore {
async fn get(&self, key: &str) -> Option<String>;
async fn set(&self, key: &str, value: String);
}
// Now you have two traits:
// - DataStore: no Send bound on the futures
// - SendDataStore: all futures are Send
// Both have the same methods, implementors implement DataStore
// and get SendDataStore for free if their futures are Send.
// Use SendDataStore when you need to spawn:
async fn spawn_lookup(store: Arc<dyn SendDataStore>) {
tokio::spawn(async move {
store.get("key").await;
});
}
}
Quick Reference: Async Traits
速查表:异步 Trait 的几种方案
| Approach 方案 | Static Dispatch 静态分发 | Dynamic Dispatch 动态分发 | Send | Syntax Overhead 语法负担 |
|---|---|---|---|---|
Native async fn in traittrait 里原生 async fn | ✅ | ❌ | Implicit 隐式 | None 几乎没有 |
trait_variant | ✅ | ✅ | Explicit 显式 | #[trait_variant::make] |
Manual Box::pin手动 Box::pin | ✅ | ✅ | Explicit 显式 | High 较高 |
async-trait crate | ✅ | ✅ | #[async_trait] | Medium (proc macro) 中等(过程宏) |
Recommendation: For new code on Rust 1.75+, use native async traits first. When
dyndispatch or spawned tasks matter, pair them withtrait_variant. The oldasync-traitcrate still works and is still common, but it boxes every future, so the native approach is cheaper for static dispatch.
建议: 新代码只要跑在 Rust 1.75+ 上,优先使用原生 async trait。只要碰到dyn分发或者要把任务扔进运行时线程池,再配上trait_variant。async-trait这个老牌 crate 依然常见,也照样能用,但它会给每个 future 做装箱;在静态分发场景里,原生方案更省。
Async Closures (Rust 1.85+)
异步闭包(Rust 1.85+)
Since Rust 1.85, async closures are stable. They capture environment just like normal closures, but the closure body itself is asynchronous and returns a future.
从 Rust 1.85 开始,异步闭包终于稳定了。它和普通闭包一样能捕获环境,只是闭包体本身是异步的,返回的是一个 future。
#![allow(unused)]
fn main() {
// Before 1.85: awkward workaround
let urls = vec!["https://a.com", "https://b.com"];
let fetchers: Vec<_> = urls.iter().map(|url| {
let url = url.to_string();
// Returns a non-async closure that returns an async block
move || async move { reqwest::get(&url).await }
}).collect();
// After 1.85: async closures just work
let fetchers: Vec<_> = urls.iter().map(|url| {
async move || { reqwest::get(url).await }
// This is an async closure: captures url and returns a Future
}).collect();
}
Async closures implement the new AsyncFn, AsyncFnMut, and AsyncFnOnce traits, which mirror Fn, FnMut, and FnOnce in the synchronous world:
异步闭包会实现新的 AsyncFn、AsyncFnMut 和 AsyncFnOnce trait。它们和同步世界里的 Fn、FnMut、FnOnce 是一一对应的。
#![allow(unused)]
fn main() {
// Generic function accepting an async closure
async fn retry<F>(max: usize, f: F) -> Result<String, Error>
where
F: AsyncFn() -> Result<String, Error>,
{
for _ in 0..max {
if let Ok(val) = f().await {
return Ok(val);
}
}
f().await
}
}
Migration tip: If older code uses
Fn() -> impl Future<Output = T>, it is worth consideringAsyncFn() -> Tnow. The signatures read better, and callback-heavy APIs become much easier to understand.
迁移建议: 如果旧代码里满地都是Fn() -> impl Future<Output = T>这种签名,现在完全可以考虑改成AsyncFn() -> T。签名会清爽很多,回调密集的 API 也更容易看懂。
🏋️ Exercise: Design an Async Service Trait 🏋️ 练习:设计一个异步服务 Trait
Challenge: Design a Cache trait with async get and set methods. Implement it twice: once with a HashMap in memory and once with a simulated Redis backend that uses tokio::time::sleep to mimic network latency. Then write a generic function that works with both implementations.
挑战题: 设计一个带异步 get 和 set 方法的 Cache trait。做两份实现:一份用内存里的 HashMap,另一份模拟 Redis 后端,用 tokio::time::sleep 模拟网络延迟。最后再写一个能同时操作这两种实现的泛型函数。
🔑 Solution 🔑 参考答案
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
trait Cache {
async fn get(&self, key: &str) -> Option<String>;
async fn set(&self, key: &str, value: String);
}
// --- In-memory implementation ---
struct MemoryCache {
store: Mutex<HashMap<String, String>>,
}
impl MemoryCache {
fn new() -> Self {
MemoryCache {
store: Mutex::new(HashMap::new()),
}
}
}
impl Cache for MemoryCache {
async fn get(&self, key: &str) -> Option<String> {
self.store.lock().await.get(key).cloned()
}
async fn set(&self, key: &str, value: String) {
self.store.lock().await.insert(key.to_string(), value);
}
}
// --- Simulated Redis implementation ---
struct RedisCache {
store: Mutex<HashMap<String, String>>,
latency: Duration,
}
impl RedisCache {
fn new(latency_ms: u64) -> Self {
RedisCache {
store: Mutex::new(HashMap::new()),
latency: Duration::from_millis(latency_ms),
}
}
}
impl Cache for RedisCache {
async fn get(&self, key: &str) -> Option<String> {
sleep(self.latency).await; // Simulate network round-trip
self.store.lock().await.get(key).cloned()
}
async fn set(&self, key: &str, value: String) {
sleep(self.latency).await;
self.store.lock().await.insert(key.to_string(), value);
}
}
// --- Generic function working with any Cache ---
async fn cache_demo<C: Cache>(cache: &C, label: &str) {
cache.set("greeting", "Hello, async!".into()).await;
let val = cache.get("greeting").await;
println!("[{label}] greeting = {val:?}");
}
#[tokio::main]
async fn main() {
let mem = MemoryCache::new();
cache_demo(&mem, "memory").await;
let redis = RedisCache::new(50);
cache_demo(&redis, "redis").await;
}
Key takeaway: The same generic function works for both implementations through static dispatch. There is no boxing and no allocation overhead. If dynamic dispatch becomes necessary, adding trait_variant::make(SendCache: Send) is the next step.
要点: 这两个实现都能通过静态分发喂给同一个泛型函数,中间没有额外装箱,也没有多余分配。如果后面确实需要动态分发,再补上 trait_variant::make(SendCache: Send) 就行。
Key Takeaways — Async Traits
本章要点:异步 Trait
- Since Rust 1.75,
async fncan be written directly in traits without#[async_trait].
从 Rust 1.75 起,trait 里可以直接写async fn,不再强依赖#[async_trait]。trait_variant::makecan auto-generate aSendvariant for dynamic dispatch scenarios.trait_variant::make能自动生成带Send的变体,适合动态分发场景。- Async closures (
async Fn()) stabilized in 1.85 and are very suitable for callbacks and middleware.
异步闭包async Fn()在 1.85 稳定后,写回调和中间件舒服多了。- Prefer static dispatch such as
<S: Service>when performance matters.
只要性能敏感,优先使用<S: Service>这种静态分发写法。
See also: Ch 13 — Production Patterns for Tower’s
Servicetrait, and Ch 6 — Building Futures by Hand for manually implemented state machines.
继续阅读: 第 13 章:生产实践模式 会讲 Tower 的Servicetrait;第 6 章:手写 Future 会带着手动实现状态机。
11. Streams and AsyncIterator 🟡
11. Stream 与异步迭代 🟡
What you’ll learn:
本章将学到什么:
- The
Streamtrait: async iteration over multiple valuesStreamtrait 是什么:如何异步迭代多个值- Creating streams:
stream::iter,async_stream,unfold
如何创建 stream:stream::iter、async_stream、unfold- Stream combinators:
map,filter,buffer_unordered,fold
常见 stream 组合子:map、filter、buffer_unordered、fold- Async I/O traits:
AsyncRead,AsyncWrite,AsyncBufRead
异步 I/O trait:AsyncRead、AsyncWrite、AsyncBufRead
Stream Trait Overview
Stream trait 总览
A Stream relates to Iterator the same way Future relates to a single value: it produces multiple values, but does so asynchronously.Stream 和 Iterator 的关系,差不多就像 Future 和“单个值”的关系:它会产出多个值,只不过这个过程是异步的。
#![allow(unused)]
fn main() {
// std::iter::Iterator (synchronous, multiple values)
trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;
}
// futures::Stream (async, multiple values)
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
}
graph LR
subgraph "Sync<br/>同步"
VAL["Value<br/>(T)"]
ITER["Iterator<br/>(multiple T)<br/>多个值"]
end
subgraph "Async<br/>异步"
FUT["Future<br/>(async T)<br/>异步单值"]
STREAM["Stream<br/>(async multiple T)<br/>异步多值"]
end
VAL -->|"make async<br/>异步化"| FUT
ITER -->|"make async<br/>异步化"| STREAM
VAL -->|"make multiple<br/>多值化"| ITER
FUT -->|"make multiple<br/>多值化"| STREAM
style VAL fill:#e3f2fd,color:#000
style ITER fill:#e3f2fd,color:#000
style FUT fill:#c8e6c9,color:#000
style STREAM fill:#c8e6c9,color:#000
Creating Streams
创建 Stream 的几种方式
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;
// 1. From an iterator
let s = stream::iter(vec![1, 2, 3]);
// 2. From an async generator (using async_stream crate)
// Cargo.toml: async-stream = "0.3"
use async_stream::stream;
fn countdown(from: u32) -> impl futures::Stream<Item = u32> {
stream! {
for i in (0..=from).rev() {
tokio::time::sleep(Duration::from_millis(500)).await;
yield i;
}
}
}
// 3. From a tokio interval
let tick_stream = IntervalStream::new(interval(Duration::from_secs(1)));
// 4. From a channel receiver (tokio_stream::wrappers)
let (tx, rx) = tokio::sync::mpsc::channel::<String>(100);
let rx_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
// 5. From unfold (generate from async state)
let s = stream::unfold(0u32, |state| async move {
if state >= 5 {
None // Stream ends
} else {
let next = state + 1;
Some((state, next)) // yield `state`, new state is `next`
}
});
}
Different constructors fit different situations: static data, timer ticks, channel messages, or stateful generation. The important point is that they all unify under the same Stream abstraction once created.
不同的构造方式适合不同场景:静态数据、定时器节拍、channel 消息,或者带状态的生成器。关键在于,一旦构造出来,它们就都能统一被当成 Stream 来处理。
Consuming Streams
消费 Stream
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
async fn stream_examples() {
let s = stream::iter(vec![1, 2, 3, 4, 5]);
// for_each — process each item
s.for_each(|x| async move {
println!("{x}");
}).await;
// map + collect
let doubled: Vec<i32> = stream::iter(vec![1, 2, 3])
.map(|x| x * 2)
.collect()
.await;
// filter
let evens: Vec<i32> = stream::iter(1..=10)
.filter(|x| futures::future::ready(x % 2 == 0))
.collect()
.await;
// buffer_unordered — process N items concurrently
let results: Vec<_> = stream::iter(vec!["url1", "url2", "url3"])
.map(|url| async move {
// Simulate HTTP fetch
tokio::time::sleep(Duration::from_millis(100)).await;
format!("response from {url}")
})
.buffer_unordered(10) // Up to 10 concurrent fetches
.collect()
.await;
// take, skip, zip, chain — just like Iterator
let first_three: Vec<i32> = stream::iter(1..=100)
.take(3)
.collect()
.await;
}
}
If ordinary Iterator chains feel familiar, stream combinators should feel surprisingly natural. The main difference is that collection and traversal are now async-aware and may suspend between items.
如果已经熟悉普通 Iterator 链,那么 stream 组合子其实会很顺手。最大的区别只是:现在收集和遍历过程本身也带上了异步语义,中途可能发生挂起。
Comparison with C# IAsyncEnumerable
和 C# IAsyncEnumerable 的对照
| Feature 特性 | Rust Stream | C# IAsyncEnumerable<T> |
|---|---|---|
| Syntax 语法 | stream! { yield x; } | await foreach / yield return |
| Cancellation 取消 | Drop the stream 直接丢弃 stream | CancellationToken |
| Backpressure 背压 | Consumer controls poll rate 消费者控制轮询速度 | Consumer controls MoveNextAsync |
| Built-in 是否内建 | No, via crate 不是标准库内建,需要 crate | Yes |
| Combinators 组合子 | .map()、.filter()、.buffer_unordered() | LINQ + System.Linq.Async |
| Error handling 错误处理 | Stream<Item = Result<T, E>> | Throw inside async iterator 在异步迭代器里抛异常 |
#![allow(unused)]
fn main() {
// Rust: Stream of database rows
// NOTE: try_stream! (not stream!) is required when using ? inside the body.
// stream! doesn't propagate errors — try_stream! yields Err(e) and ends.
fn get_users(db: &Database) -> impl Stream<Item = Result<User, DbError>> + '_ {
try_stream! {
let mut cursor = db.query("SELECT * FROM users").await?;
while let Some(row) = cursor.next().await {
yield User::from_row(row?);
}
}
}
// Consume:
let mut users = pin!(get_users(&db));
while let Some(result) = users.next().await {
match result {
Ok(user) => println!("{}", user.name),
Err(e) => eprintln!("Error: {e}"),
}
}
}
// C# equivalent:
async IAsyncEnumerable<User> GetUsers() {
await using var reader = await db.QueryAsync("SELECT * FROM users");
while (await reader.ReadAsync()) {
yield return User.FromRow(reader);
}
}
// Consume:
await foreach (var user in GetUsers()) {
Console.WriteLine(user.Name);
}
🏋️ Exercise: Build an Async Stats Aggregator
🏋️ 练习:实现一个异步统计聚合器
Challenge: Given a sensor reading stream Stream<Item = f64>, write an async function that returns (count, min, max, average) without collecting the whole stream into a Vec first.
挑战:给定一个传感器读数流 Stream<Item = f64>,写一个异步函数返回 (count, min, max, average),并且不要先把整个 stream 收集进 Vec。
Hint: Use .fold() to accumulate state across items.
提示:可以用 .fold() 一边消费 stream,一边累积状态。
🔑 Solution
🔑 参考答案
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct Stats {
count: usize,
min: f64,
max: f64,
sum: f64,
}
impl Stats {
fn average(&self) -> f64 {
if self.count == 0 { 0.0 } else { self.sum / self.count as f64 }
}
}
async fn compute_stats<S: futures::Stream<Item = f64> + Unpin>(stream: S) -> Stats {
stream
.fold(
Stats { count: 0, min: f64::INFINITY, max: f64::NEG_INFINITY, sum: 0.0 },
|mut acc, value| async move {
acc.count += 1;
acc.min = acc.min.min(value);
acc.max = acc.max.max(value);
acc.sum += value;
acc
},
)
.await
}
#[tokio::test]
async fn test_stats() {
let readings = stream::iter(vec![23.5, 24.1, 22.8, 25.0, 23.9]);
let stats = compute_stats(readings).await;
assert_eq!(stats.count, 5);
assert!((stats.min - 22.8).abs() < f64::EPSILON);
assert!((stats.max - 25.0).abs() < f64::EPSILON);
assert!((stats.average() - 23.86).abs() < 0.01);
}
}
Key takeaway: combinators like .fold() let a stream be processed item by item without buffering the whole thing in memory. That matters a lot once the stream is very large or potentially unbounded.
核心收获: .fold() 这类组合子可以让 stream 一项一项地被处理,而不是先整体装进内存。只要数据量很大,或者流本身没有上界,这一点就非常重要。
Async I/O Traits: AsyncRead, AsyncWrite, AsyncBufRead
异步 I/O trait:AsyncRead、AsyncWrite、AsyncBufRead
Just as std::io::Read and Write are the foundation of synchronous I/O, the async traits in tokio::io or futures::io form the foundation of async I/O code.
就像 std::io::Read 和 Write 是同步 I/O 的根基一样,tokio::io 或 futures::io 里的异步 trait 也是异步 I/O 代码的根基。
#![allow(unused)]
fn main() {
// tokio::io — the async versions of std::io traits
/// Read bytes from a source asynchronously
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, // Tokio's safe wrapper around uninitialized memory
) -> Poll<io::Result<()>>;
}
/// Write bytes to a sink asynchronously
pub trait AsyncWrite {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
/// Buffered reading with line support
pub trait AsyncBufRead: AsyncRead {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
fn consume(self: Pin<&mut Self>, amt: usize);
}
}
In practice, most code will not call these poll_* methods directly. Instead, the extension traits AsyncReadExt and AsyncWriteExt provide ergonomic .await-friendly helpers.
实战里,大多数代码都不会自己去直接调这些 poll_* 方法。通常会通过 AsyncReadExt、AsyncWriteExt 这类扩展 trait 使用更顺手、能直接 .await 的辅助方法。
#![allow(unused)]
fn main() {
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt};
use tokio::net::TcpStream;
use tokio::io::BufReader;
async fn io_examples() -> tokio::io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// AsyncWriteExt: write_all, write_u32, write_buf, etc.
stream.write_all(b"GET / HTTP/1.0\r\n\r\n").await?;
// AsyncReadExt: read, read_exact, read_to_end, read_to_string
let mut response = Vec::new();
stream.read_to_end(&mut response).await?;
// AsyncBufReadExt: read_line, lines(), split()
let file = tokio::fs::File::open("config.txt").await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
println!("{line}");
}
Ok(())
}
}
Implementing custom async I/O often means wrapping a raw transport in a higher-level protocol abstraction.
实现自定义异步 I/O 时,常见套路是把底层原始传输包装成更高层的协议抽象。
#![allow(unused)]
fn main() {
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
/// A length-prefixed protocol: [u32 length][payload bytes]
struct FramedStream<T> {
inner: T,
}
impl<T: AsyncRead + AsyncReadExt + Unpin> FramedStream<T> {
/// Read one complete frame
async fn read_frame(&mut self) -> tokio::io::Result<Vec<u8>>
{
// Read the 4-byte length prefix
let len = self.inner.read_u32().await? as usize;
// Read exactly that many bytes
let mut payload = vec![0u8; len];
self.inner.read_exact(&mut payload).await?;
Ok(payload)
}
}
impl<T: AsyncWrite + AsyncWriteExt + Unpin> FramedStream<T> {
/// Write one complete frame
async fn write_frame(&mut self, data: &[u8]) -> tokio::io::Result<()>
{
self.inner.write_u32(data.len() as u32).await?;
self.inner.write_all(data).await?;
self.inner.flush().await?;
Ok(())
}
}
}
| Sync Trait 同步 trait | Async Trait (tokio) | Async Trait (futures) | Extension Trait 扩展 trait |
|---|---|---|---|
std::io::Read | tokio::io::AsyncRead | futures::io::AsyncRead | AsyncReadExt |
std::io::Write | tokio::io::AsyncWrite | futures::io::AsyncWrite | AsyncWriteExt |
std::io::BufRead | tokio::io::AsyncBufRead | futures::io::AsyncBufRead | AsyncBufReadExt |
std::io::Seek | tokio::io::AsyncSeek | futures::io::AsyncSeek | AsyncSeekExt |
tokio vs futures I/O traits: the two families are similar but not identical. Tokio’s
AsyncReadusesReadBuf, which helps handle uninitialized memory safely, whilefutures::AsyncReadworks with&mut [u8].tokio_util::compatcan bridge between them.
tokio 与 futures 的 I/O trait 差别:两套接口长得像,但不是完全相同。Tokio 的AsyncRead使用ReadBuf,对未初始化内存的处理更安全;futures::AsyncRead则是基于&mut [u8]。需要打通时可以用tokio_util::compat。
Copy utilities:
tokio::io::copy(&mut reader, &mut writer)is async 版的std::io::copy,而tokio::io::copy_bidirectional则会双向同时复制,非常适合代理服务或文件转发场景。
复制工具:tokio::io::copy(&mut reader, &mut writer)可以看成std::io::copy的异步版本,而tokio::io::copy_bidirectional会把两个方向同时复制,非常适合代理服务和转发场景。
🏋️ Exercise: Build an Async Line Counter
🏋️ 练习:实现一个异步行计数器
Challenge: Write an async function that accepts any AsyncBufRead source and returns the number of non-empty lines. It should work with files, TCP streams, or any buffered reader.
挑战:写一个异步函数,它接收任意 AsyncBufRead 数据源,并返回非空行数量。这个函数应该能适用于文件、TCP 流,或者任何带缓冲的读取器。
Hint: Use AsyncBufReadExt::lines() and count only lines where !line.is_empty().
提示:可以使用 AsyncBufReadExt::lines(),然后只统计满足 !line.is_empty() 的行。
🔑 Solution
🔑 参考答案
#![allow(unused)]
fn main() {
use tokio::io::AsyncBufReadExt;
async fn count_non_empty_lines<R: tokio::io::AsyncBufRead + Unpin>(
reader: R,
) -> tokio::io::Result<usize> {
let mut lines = reader.lines();
let mut count = 0;
while let Some(line) = lines.next_line().await? {
if !line.is_empty() {
count += 1;
}
}
Ok(count)
}
// Works with any AsyncBufRead:
// let file = tokio::io::BufReader::new(tokio::fs::File::open("data.txt").await?);
// let count = count_non_empty_lines(file).await?;
//
// let tcp = tokio::io::BufReader::new(TcpStream::connect("...").await?);
// let count = count_non_empty_lines(tcp).await?;
}
Key takeaway: by programming against AsyncBufRead rather than one concrete reader type, the same logic becomes reusable across files, sockets, pipes, and in-memory buffers.
核心收获: 只要代码是面向 AsyncBufRead 这个抽象来写,而不是绑死在某个具体读取器类型上,同一套逻辑就能复用到文件、socket、管道,甚至内存缓冲区。
Key Takeaways — Streams and AsyncIterator
本章要点——Stream 与异步迭代
Streamis the async equivalent ofIteratorand yieldsPoll::Ready(Some(item))orPoll::Ready(None)Stream就是Iterator的异步版本,会产出Poll::Ready(Some(item))或Poll::Ready(None).buffer_unordered(N)is the key stream tool for processing N items concurrently.buffer_unordered(N)是 stream 场景里最关键的并发处理工具之一async_stream::stream!is usually the easiest way to create custom streamsasync_stream::stream!往往是自定义 stream 最省事的入口AsyncReadandAsyncBufReadlet I/O code stay generic and reusable across files, sockets, and pipesAsyncRead和AsyncBufRead能让 I/O 代码在文件、socket、管道之间保持通用和可复用
See also: Ch 9 — When Tokio Isn’t the Right Fit for
FuturesUnordered, and Ch 13 — Production Patterns for bounded-channel backpressure patterns.
继续阅读: 第 9 章——When Tokio Isn’t the Right Fit 会提到FuturesUnordered,而 第 13 章——Production Patterns 会继续展开有界 channel 的背压模式。
12. Common Pitfalls 🔴
12. 常见陷阱 🔴
What you’ll learn:
本章将学到什么:
- 9 common async Rust bugs and how to fix them
9 类常见 async Rust 问题,以及各自的修正思路- Why blocking the executor is the #1 mistake
为什么阻塞执行器线程是头号大坑- Cancellation hazards when a future is dropped mid-await
future 在.await中途被丢弃时会带来哪些取消风险- Debugging with
tokio-console,tracing, and#[instrument]
如何用tokio-console、tracing、#[instrument]调试异步代码- Testing with
#[tokio::test],time::pause(), and trait-based mocking
如何用#[tokio::test]、time::pause()和基于 trait 的 mock 做测试
Blocking the Executor
阻塞执行器
The number one async Rust mistake is running blocking work on an executor thread. Once that happens, other tasks sharing that thread stop making progress.
async Rust 里排名第一的错误,就是把阻塞操作丢到执行器线程上跑。一旦这么干,同一线程上的其他任务就会一起被卡住,完全推不动。
#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the entire executor thread
async fn bad_handler() -> String {
let data = std::fs::read_to_string("big_file.txt").unwrap(); // BLOCKS!
process(&data)
}
// ✅ CORRECT: Offload blocking work to a dedicated thread pool
async fn good_handler() -> String {
let data = tokio::task::spawn_blocking(|| {
std::fs::read_to_string("big_file.txt").unwrap()
}).await.unwrap();
process(&data)
}
// ✅ ALSO CORRECT: Use tokio's async fs
async fn also_good_handler() -> String {
let data = tokio::fs::read_to_string("big_file.txt").await.unwrap();
process(&data)
}
}
graph TB
subgraph "❌ Blocking Call on Executor<br/>阻塞发生在执行器线程"
T1_BAD["Thread 1: std::fs::read()<br/>🔴 BLOCKED for 500ms"]
T2_BAD["Thread 2: handling requests<br/>🟢 Working alone"]
TASKS_BAD["100 pending tasks<br/>⏳ Starved"]
T1_BAD -->|"can't poll<br/>无法轮询"| TASKS_BAD
end
subgraph "✅ spawn_blocking<br/>把阻塞工作扔到专门线程池"
T1_GOOD["Thread 1: polling futures<br/>🟢 Available"]
T2_GOOD["Thread 2: polling futures<br/>🟢 Available"]
BT["Blocking pool thread:<br/>std::fs::read()<br/>🔵 Separate pool"]
TASKS_GOOD["100 tasks<br/>✅ All making progress"]
T1_GOOD -->|"polls"| TASKS_GOOD
T2_GOOD -->|"polls"| TASKS_GOOD
end
std::thread::sleep vs tokio::time::sleep
std::thread::sleep 与 tokio::time::sleep 的区别
#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the executor thread for 5 seconds
async fn bad_delay() {
std::thread::sleep(Duration::from_secs(5)); // Thread can't poll anything else!
}
// ✅ CORRECT: Yields to the executor, other tasks can run
async fn good_delay() {
tokio::time::sleep(Duration::from_secs(5)).await; // Non-blocking!
}
}
The rule is simple: inside async code, always ask whether an operation parks the task or blocks the thread. Only the first one is what you want.
判断标准其实很简单:在 async 代码里,先问自己这一步到底是“挂起当前任务”,还是“卡死当前线程”。真正想要的只有前者。
Holding MutexGuard Across .await
把 MutexGuard 跨 .await 持有
#![allow(unused)]
fn main() {
use std::sync::Mutex; // std Mutex — NOT async-aware
// ❌ WRONG: MutexGuard held across .await
async fn bad_mutex(data: &Mutex<Vec<String>>) {
let mut guard = data.lock().unwrap();
guard.push("item".into());
some_io().await; // 💥 Guard is held here — blocks other threads from locking!
guard.push("another".into());
}
// Also: std::sync::MutexGuard is !Send, so this won't compile
// with tokio's multi-threaded runtime.
// ✅ FIX 1: Scope the guard to drop before .await
async fn good_mutex_scoped(data: &Mutex<Vec<String>>) {
{
let mut guard = data.lock().unwrap();
guard.push("item".into());
} // Guard dropped here
some_io().await; // Safe — lock is released
{
let mut guard = data.lock().unwrap();
guard.push("another".into());
}
}
// ✅ FIX 2: Use tokio::sync::Mutex (async-aware)
use tokio::sync::Mutex as AsyncMutex;
async fn good_async_mutex(data: &AsyncMutex<Vec<String>>) {
let mut guard = data.lock().await; // Async lock — doesn't block the thread
guard.push("item".into());
some_io().await; // OK — tokio Mutex guard is Send
guard.push("another".into());
}
}
When to use which mutex:
什么时候用哪种 mutex:
std::sync::Mutex: short critical sections with no.awaitinsidestd::sync::Mutex:临界区很短,而且中间绝对没有.awaittokio::sync::Mutex: when the lock must survive across.awaitpointstokio::sync::Mutex:锁必须跨.await存活时parking_lot::Mutex: fasterstd-style mutex, but still not for.awaitparking_lot::Mutex:更快更轻的同步 mutex,但依旧不该跨.await
Cancellation Hazards
取消风险
Dropping a future cancels it immediately. That sounds simple, but partial side effects can leave systems in a broken intermediate state.
future 一旦被丢弃,就会立刻取消。听起来很直接,但如果操作只做了一半,系统就可能被留在一个非常难看的中间态里。
#![allow(unused)]
fn main() {
// ❌ DANGEROUS: Resource leak on cancellation
async fn transfer(from: &Account, to: &Account, amount: u64) {
from.debit(amount).await; // If cancelled HERE...
to.credit(amount).await; // ...money vanishes!
}
// ✅ SAFE: Make operations atomic or use compensation
async fn safe_transfer(from: &Account, to: &Account, amount: u64) -> Result<(), Error> {
// Use a database transaction (all-or-nothing)
let tx = db.begin_transaction().await?;
tx.debit(from, amount).await?;
tx.credit(to, amount).await?;
tx.commit().await?; // Only commits if everything succeeded
Ok(())
}
// ✅ ALSO SAFE: Use tokio::select! with cancellation awareness
tokio::select! {
result = transfer(from, to, amount) => {
// Transfer completed
}
_ = shutdown_signal() => {
// Don't cancel mid-transfer — let it finish
// Or: roll back explicitly
}
}
}
No Async Drop
没有异步 Drop
Rust’s Drop trait is synchronous. That means there is no legal way to .await inside drop().
Rust 的 Drop trait 是同步的,所以根本不存在“在 drop() 里 .await 一下”的合法写法。
#![allow(unused)]
fn main() {
struct DbConnection { /* ... */ }
impl Drop for DbConnection {
fn drop(&mut self) {
// ❌ Can't do this — drop() is sync!
// self.connection.shutdown().await;
// ✅ Workaround 1: Spawn a cleanup task (fire-and-forget)
let conn = self.connection.take();
tokio::spawn(async move {
let _ = conn.shutdown().await;
});
// ✅ Workaround 2: Use a synchronous close
// self.connection.blocking_close();
}
}
}
Best practice is to provide an explicit async fn close(self) and document that callers should use it. Drop should be treated as a fallback safety net, not the main cleanup path.
更靠谱的做法,是显式提供一个 async fn close(self),并在文档里说清楚调用方应该主动调用它。Drop 更适合作为兜底,而不是主要清理通道。
select! Fairness and Starvation
select! 的公平性与饥饿问题
#![allow(unused)]
fn main() {
use tokio::sync::mpsc;
// ❌ UNFAIR: busy_stream always wins, slow_stream starves
async fn unfair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
loop {
tokio::select! {
Some(v) = fast.recv() => println!("fast: {v}"),
Some(v) = slow.recv() => println!("slow: {v}"),
// If both are ready, tokio randomly picks one.
// But if `fast` is ALWAYS ready, `slow` rarely gets polled.
}
}
}
// ✅ FAIR: Use biased select or drain in batches
async fn fair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
loop {
tokio::select! {
biased; // Always check in order — explicit priority
Some(v) = slow.recv() => println!("slow: {v}"), // Priority!
Some(v) = fast.recv() => println!("fast: {v}"),
}
}
}
}
Accidental Sequential Execution
不小心写成串行执行
#![allow(unused)]
fn main() {
// ❌ SEQUENTIAL: Takes 2 seconds total
async fn slow() {
let a = fetch("url_a").await; // 1 second
let b = fetch("url_b").await; // 1 second (waits for a to finish first!)
}
// ✅ CONCURRENT: Takes 1 second total
async fn fast() {
let (a, b) = tokio::join!(
fetch("url_a"), // Both start immediately
fetch("url_b"),
);
}
// ✅ ALSO CONCURRENT: Using let + join
async fn also_fast() {
let fut_a = fetch("url_a"); // Create future (lazy — not started yet)
let fut_b = fetch("url_b"); // Create future
let (a, b) = tokio::join!(fut_a, fut_b); // NOW both run concurrently
}
}
Trap:
let a = fetch(url).await; let b = fetch(url).await;is sequential. If both tasks are independent, reach forjoin!,spawn, or stream-based concurrency instead.
陷阱:let a = fetch(url).await; let b = fetch(url).await;就是标准串行写法。如果两件事互相独立,该用的是join!、spawn,或者基于 stream 的并发处理。
Case Study: Debugging a Hung Production Service
案例:排查一个卡死的生产服务
Imagine a service that runs normally for ten minutes and then silently stops responding. There are no obvious errors, CPU is near zero, and logs看起来也没什么异常。
设想这样一个场景:服务前十分钟一切正常,之后突然不再响应。日志里没有明显报错,CPU 也接近零,看着就像“没崩,但也不干活了”。
Diagnosis steps:
排查步骤:
- Attach
tokio-consoleand discover 200+ tasks stuck inPending.
1. 先接上tokio-console,发现 200 多个任务全卡在Pending。 - Inspect the tasks and notice they are all waiting on the same
Mutex::lock().await.
2. 看任务细节,发现它们全在等同一个Mutex::lock().await。 - Find the root cause: one task held a
std::sync::MutexGuardacross an.await, then panicked and poisoned the mutex.
3. 最终根因是:有一个任务把std::sync::MutexGuard跨.await持有,随后 panic,把 mutex 毒化了。
The fix:
修正方式:
| Before (broken) 修之前 | After (fixed) 修之后 |
|---|---|
std::sync::Mutex | tokio::sync::Mutex |
.lock().unwrap() across .await | Drop the lock before .await |
| No timeout on lock acquisition | tokio::time::timeout(dur, mutex.lock()) |
| No recovery on poisoned mutex | tokio::sync::Mutex doesn’t poison |
Prevention checklist:
预防清单:
- Use
tokio::sync::Mutexif a guard may cross any.await.
如果 guard 可能跨.await,优先换成tokio::sync::Mutex。 - Add
#[tracing::instrument]to important async functions.
给关键异步函数加上#[tracing::instrument]。 - Run
tokio-consolein staging and pre-release environments.
在预发或 staging 环境把tokio-console跑起来。 - Add health checks that verify task responsiveness, not just process liveness.
健康检查别只看进程活着没,要能反映任务是否还能正常推进。
🏋️ Exercise: Spot the Bugs
🏋️ 练习:找出这些坑
Challenge: Find the async problems in this code and fix them.
挑战:把下面这段代码里的异步陷阱全找出来,并给出修正版本。
#![allow(unused)]
fn main() {
use std::sync::Mutex;
async fn process_requests(urls: Vec<String>) -> Vec<String> {
let results = Mutex::new(Vec::new());
for url in &urls {
let response = reqwest::get(url).await.unwrap().text().await.unwrap();
std::thread::sleep(std::time::Duration::from_millis(100)); // Rate limit
let mut guard = results.lock().unwrap();
guard.push(response);
expensive_parse(&guard).await; // Parse all results so far
}
results.into_inner().unwrap()
}
}
🔑 Solution
🔑 参考答案
Bugs found:
发现的问题:
- Sequential fetches instead of concurrent ones.
1. 请求是一个一个串行抓的,没有并发。 std::thread::sleepblocks the executor thread.
2.std::thread::sleep会卡死执行器线程。MutexGuardis held across.await.
3.MutexGuard被跨.await持有。- The mutex itself is unnecessary once the flow is restructured.
4. 只要流程改对,整个 mutex 都可以不要。
#![allow(unused)]
fn main() {
use tokio::sync::Mutex;
use std::sync::Arc;
use futures::stream::{self, StreamExt};
async fn process_requests(urls: Vec<String>) -> Vec<String> {
// Fix 4: Process URLs concurrently with buffer_unordered
let results: Vec<String> = stream::iter(urls)
.map(|url| async move {
let response = reqwest::get(&url).await.unwrap().text().await.unwrap();
// Fix 2: Use tokio::time::sleep instead of std::thread::sleep
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
response
})
.buffer_unordered(10) // Up to 10 concurrent requests
.collect()
.await;
// Fix 3: Parse after collecting — no mutex needed at all!
for result in &results {
expensive_parse(result).await;
}
results
}
}
Key takeaway: a lot of async mutex pain disappears once the flow is redesigned. Often the real fix is not “use a better mutex”, but “stop sharing mutable state unnecessarily”.
核心收获: 很多异步 mutex 痛点,其实在重排流程之后就会自己消失。真正的修正往往不是“换个更好的锁”,而是“别让本来就没必要共享的可变状态继续共享”。
Debugging Async Code
调试异步代码
Async stack traces often look weird because what you see is the executor’s poll machinery, not the logical call chain in the form the source code suggests.
异步代码的栈追踪经常很难看,因为屏幕上看到的往往是执行器的轮询过程,而不是源码里那个“看起来像顺序执行”的逻辑调用链。
tokio-console: Real-Time Task Inspector
tokio-console:实时任务观察器
tokio-console provides an htop-style view of every spawned task: task state, poll duration, wake activity, and more.tokio-console 会给每个 spawn 出去的任务提供一个类似 htop 的视图:状态、轮询耗时、唤醒情况、资源使用都能看。
# Cargo.toml
[dependencies]
console-subscriber = "0.4"
tokio = { version = "1", features = ["full", "tracing"] }
#[tokio::main]
async fn main() {
console_subscriber::init(); // Replaces the default tracing subscriber
// ... rest of your application
}
Then in another terminal:
然后在另一个终端里运行:
$ RUSTFLAGS="--cfg tokio_unstable" cargo run
$ tokio-console
tracing + #[instrument]
tracing + #[instrument]
The tracing crate understands async lifetimes. Spans stay alive across .await points, which makes the logical flow much easier to reconstruct.tracing 对异步执行周期是有感知的。span 可以跨 .await 存活,这会让逻辑调用过程更容易被重新拼起来。
#![allow(unused)]
fn main() {
use tracing::{info, instrument};
#[instrument(skip(db_pool), fields(user_id = %user_id))]
async fn handle_request(user_id: u64, db_pool: &Pool) -> Result<Response> {
info!("looking up user");
let user = db_pool.get_user(user_id).await?; // span stays open across .await
info!(email = %user.email, "found user");
let orders = fetch_orders(user_id).await?; // still the same span
Ok(build_response(user, orders))
}
}
{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"message":"looking up user"}
{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"fields":{"email":"a@b.com"},"message":"found user"}
Debugging Checklist
调试清单
| Symptom 现象 | Likely Cause 常见原因 | Tool 工具 |
|---|---|---|
| Task hangs forever 任务永远挂着 | Missing .await or deadlocked mutex漏了 .await,或者 mutex 卡死 | tokio-console task view |
| Low throughput 吞吐很低 | Blocking call on async thread 异步线程里混入阻塞调用 | tokio-console poll histogram |
Future is not Send编译器说 Future is not Send | Non-Send value held across .await有非 Send 值跨 .await 存活 | Compiler + #[instrument] |
| Mysterious cancellation 莫名其妙被取消 | A branch got dropped by select!select! 把某个分支直接丢了 | tracing span lifecycle |
Tip:
tokio-console的很多任务级指标需要在编译时开启RUSTFLAGS="--cfg tokio_unstable"。这是编译期开关,不是运行时选项。
提示:tokio-console里一些更细的任务指标,需要在编译时打开RUSTFLAGS="--cfg tokio_unstable"。这不是运行时参数,而是编译期配置。
Testing Async Code
测试异步代码
Async code brings extra testing challenges: you need a runtime, often need controllable time, and sometimes need deterministic scheduling for race-sensitive logic.
异步代码的测试会多出几件麻烦事:需要运行时、经常需要可控时间,还可能需要更可预测的调度行为来复现竞态相关逻辑。
Basic async tests:
基础异步测试:
#![allow(unused)]
fn main() {
// Cargo.toml
// [dev-dependencies]
// tokio = { version = "1", features = ["full", "test-util"] }
#[tokio::test]
async fn test_basic_async() {
let result = fetch_data().await;
assert_eq!(result, "expected");
}
// Single-threaded test (useful for !Send types):
#[tokio::test(flavor = "current_thread")]
async fn test_single_threaded() {
let rc = std::rc::Rc::new(42);
let val = async { *rc }.await;
assert_eq!(val, 42);
}
// Multi-threaded with explicit worker count:
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_behavior() {
// Tests race conditions with real concurrency
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = counter.clone();
let c2 = counter.clone();
let (a, b) = tokio::join!(
tokio::spawn(async move { c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
tokio::spawn(async move { c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
);
a.unwrap();
b.unwrap();
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 2);
}
}
Time manipulation:
时间控制:
#![allow(unused)]
fn main() {
use tokio::time::{self, Duration, Instant};
#[tokio::test]
async fn test_timeout_behavior() {
// Pause time — sleep() advances instantly, no real wall-clock delay
time::pause();
let start = Instant::now();
time::sleep(Duration::from_secs(3600)).await; // "waits" 1 hour — takes 0ms
assert!(start.elapsed() >= Duration::from_secs(3600));
}
#[tokio::test]
async fn test_retry_timing() {
time::pause();
let start = Instant::now();
let result = retry_with_backoff(|| async {
Err::<(), _>("simulated failure")
}, 3, Duration::from_secs(1))
.await;
assert!(result.is_err());
assert!(start.elapsed() >= Duration::from_secs(7));
}
#[tokio::test]
async fn test_deadline_exceeded() {
time::pause();
let result = tokio::time::timeout(
Duration::from_secs(5),
async {
time::sleep(Duration::from_secs(10)).await;
"done"
}
).await;
assert!(result.is_err()); // Timed out
}
}
Mocking async dependencies:
给异步依赖做 mock:
#![allow(unused)]
fn main() {
// Define a trait for the dependency:
trait Storage {
async fn get(&self, key: &str) -> Option<String>;
async fn set(&self, key: &str, value: String);
}
// Production implementation:
struct RedisStorage { /* ... */ }
impl Storage for RedisStorage {
async fn get(&self, key: &str) -> Option<String> {
// Real Redis call
todo!()
}
async fn set(&self, key: &str, value: String) {
todo!()
}
}
// Test mock:
struct MockStorage {
data: std::sync::Mutex<std::collections::HashMap<String, String>>,
}
impl MockStorage {
fn new() -> Self {
MockStorage { data: std::sync::Mutex::new(std::collections::HashMap::new()) }
}
}
impl Storage for MockStorage {
async fn get(&self, key: &str) -> Option<String> {
self.data.lock().unwrap().get(key).cloned()
}
async fn set(&self, key: &str, value: String) {
self.data.lock().unwrap().insert(key.to_string(), value);
}
}
// Tested function is generic over Storage:
async fn cache_lookup<S: Storage>(store: &S, key: &str) -> String {
match store.get(key).await {
Some(val) => val,
None => {
let val = "computed".to_string();
store.set(key, val.clone()).await;
val
}
}
}
#[tokio::test]
async fn test_cache_miss_then_hit() {
let mock = MockStorage::new();
// First call: miss → computes and stores
let val = cache_lookup(&mock, "key1").await;
assert_eq!(val, "computed");
// Second call: hit → returns stored value
let val = cache_lookup(&mock, "key1").await;
assert_eq!(val, "computed");
assert!(mock.data.lock().unwrap().contains_key("key1"));
}
}
Testing channels and task communication:
测试 channel 与任务通信:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_producer_consumer() {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
// tx dropped here — channel closes
});
let mut received = Vec::new();
while let Some(val) = rx.recv().await {
received.push(val);
}
assert_eq!(received, vec![0, 1, 2, 3, 4]);
}
}
| Test Pattern 测试模式 | When to Use 适用场景 | Key Tool 关键工具 |
|---|---|---|
#[tokio::test] | All async tests 通用异步测试 | Tokio test runtime |
time::pause() | Timeouts, retries, periodic tasks 超时、重试、定时任务 | Virtual time |
| Trait mocking | Business logic without real I/O 隔离真实 I/O 的业务逻辑测试 | Generic <S: Storage> |
current_thread flavor | !Send types, deterministic scheduling测试 !Send 类型,或需要更稳定调度 | #[tokio::test(flavor = "current_thread")] |
multi_thread flavor | Race-condition testing 并发竞态相关测试 | #[tokio::test(flavor = "multi_thread")] |
Key Takeaways — Common Pitfalls
本章要点——常见陷阱
- Never block the executor; use
spawn_blockingor async-native APIs
永远别阻塞执行器;要么用spawn_blocking,要么用原生异步 API- Never hold a
MutexGuardacross.awaitunless the abstraction is designed for it
除非抽象本来就是为此设计的,否则别把MutexGuard跨.await持有- Cancellation means the future is dropped immediately, so partial side effects must be handled carefully
取消就意味着 future 会立刻被丢弃,因此任何“做了一半”的副作用都要提前设计好- Use
tokio-consoleand#[tracing::instrument]to make async behavior visible
用tokio-console和#[tracing::instrument]把异步行为变得可见- Use
#[tokio::test]andtime::pause()to make async tests deterministic and fast
用#[tokio::test]和time::pause()让异步测试又快又稳定
See also: Ch 8 — Tokio Deep Dive for sync primitives, Ch 13 — Production Patterns for graceful shutdown and structured concurrency.
继续阅读: 第 8 章——Tokio Deep Dive 会继续讲同步原语,第 13 章——Production Patterns 则会把优雅停机和结构化并发展开讲清楚。
13. Production Patterns 🔴
13. 生产环境模式 🔴
What you’ll learn:
本章将学到什么:
- Graceful shutdown with
watchchannels andselect!
如何用watchchannel 和select!做优雅停机- Backpressure: bounded channels prevent OOM
什么是背压:为什么有界 channel 能防止 OOM- Structured concurrency:
JoinSetandTaskTracker
结构化并发:JoinSet与TaskTracker- Timeouts, retries, and exponential backoff
超时、重试与指数退避- Error handling:
thiserrorvsanyhow, the double-?pattern
错误处理:thiserror与anyhow,以及双重?模式- Tower: the middleware pattern used by axum, tonic, and hyper
Tower:axum、tonic、hyper 使用的中间件模式
Graceful Shutdown
优雅停机
Production servers need to stop cleanly: stop accepting new work, let in-flight requests finish, flush buffers, and close resources in order.
生产环境里的服务不能粗暴退出。正确做法通常是:停止接收新请求,让正在处理的请求收尾,把缓冲区刷干净,再按顺序关闭连接和资源。
use tokio::signal;
use tokio::sync::watch;
async fn main_server() {
// Create a shutdown signal channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Spawn the server
let server_handle = tokio::spawn(run_server(shutdown_rx.clone()));
// Wait for Ctrl+C
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
println!("Shutdown signal received, finishing in-flight requests...");
// Notify all tasks to shut down
// NOTE: .unwrap() is used for brevity. Production code should handle
// the case where all receivers have been dropped.
shutdown_tx.send(true).unwrap();
// Wait for server to finish (with timeout)
match tokio::time::timeout(
std::time::Duration::from_secs(30),
server_handle,
).await {
Ok(Ok(())) => println!("Server shut down gracefully"),
Ok(Err(e)) => eprintln!("Server error: {e}"),
Err(_) => eprintln!("Server shutdown timed out — forcing exit"),
}
}
async fn run_server(mut shutdown: watch::Receiver<bool>) {
loop {
tokio::select! {
// Accept new connections
conn = accept_connection() => {
let shutdown = shutdown.clone();
tokio::spawn(handle_connection(conn, shutdown));
}
// Shutdown signal
_ = shutdown.changed() => {
if *shutdown.borrow() {
println!("Stopping accepting new connections");
break;
}
}
}
}
// In-flight connections will finish on their own
// because they have their own shutdown_rx clone
}
async fn handle_connection(conn: Connection, mut shutdown: watch::Receiver<bool>) {
loop {
tokio::select! {
request = conn.next_request() => {
// Process the request fully — don't abandon mid-request
process_request(request).await;
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
// Finish current request, then exit
break;
}
}
}
}
}
sequenceDiagram
participant OS as OS Signal
participant Main as Main Task
participant WCH as watch Channel
participant W1 as Worker 1
participant W2 as Worker 2
OS->>Main: SIGINT (Ctrl+C)
Main->>WCH: send(true)
WCH-->>W1: changed()
WCH-->>W2: changed()
Note over W1: Finish current request
Note over W2: Finish current request
W1-->>Main: Task complete
W2-->>Main: Task complete
Main->>Main: All workers done → exit
The important idea is coordination. A shutdown signal should flow through the whole task tree, instead of every task inventing its own local exit rule.
这里最关键的是“协调”。停机信号应该沿着整个任务树往下传,而不是每个任务自己想一套退出规则,最后收不拢。
Backpressure with Bounded Channels
用有界 Channel 做背压
If producers can outrun consumers forever, memory usage will grow forever too. That is why unbounded channels are dangerous in production systems.
如果生产者能一直比消费者快,内存占用也会一直涨。所以无界 channel 在生产环境里很危险,尤其是流量高峰和下游变慢的时候。
#![allow(unused)]
fn main() {
use tokio::sync::mpsc;
async fn backpressure_example() {
// Bounded channel: max 100 items buffered
let (tx, mut rx) = mpsc::channel::<WorkItem>(100);
// Producer: slows down naturally when buffer is full
let producer = tokio::spawn(async move {
for i in 0..1_000_000 {
// send() is async — waits if buffer is full
// This creates natural backpressure!
tx.send(WorkItem { id: i }).await.unwrap();
}
});
// Consumer: processes items at its own pace
let consumer = tokio::spawn(async move {
while let Some(item) = rx.recv().await {
process(item).await; // Slow processing is OK — producer waits
}
});
let _ = tokio::join!(producer, consumer);
}
// Compare with unbounded — DANGEROUS:
// let (tx, rx) = mpsc::unbounded_channel(); // No backpressure!
// Producer can fill memory indefinitely
}
With a bounded channel, slowness propagates upstream. That is usually exactly what a stable system wants.
有界 channel 的价值就在于:下游一旦变慢,压力会自然往上游传。对一个稳定系统来说,这往往正是想要的行为。
Structured Concurrency: JoinSet and TaskTracker
结构化并发:JoinSet 与 TaskTracker
JoinSet gives a way to spawn a related group of tasks and then wait for them as a group. This avoids“任务飞出去以后没人管”的局面。JoinSet 提供了一种把一组相关任务绑在一起管理的方式。它能避免“任务一 spawn 就飞走,后面没人收尾”的情况。
#![allow(unused)]
fn main() {
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};
async fn structured_concurrency() {
let mut set = JoinSet::new();
// Spawn a batch of tasks
for url in get_urls() {
set.spawn(async move {
fetch_and_process(url).await
});
}
// Collect all results (order not guaranteed)
let mut results = Vec::new();
while let Some(result) = set.join_next().await {
match result {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => eprintln!("Task error: {e}"),
Err(e) => eprintln!("Task panicked: {e}"),
}
}
// ALL tasks are done here — no dangling background work
println!("Processed {} items", results.len());
}
// TaskTracker (tokio-util 0.7.9+) — wait for all spawned tasks
use tokio_util::task::TaskTracker;
async fn with_tracker() {
let tracker = TaskTracker::new();
for i in 0..10 {
tracker.spawn(async move {
sleep(Duration::from_millis(100 * i)).await;
println!("Task {i} done");
});
}
tracker.close(); // No more tasks will be added
tracker.wait().await; // Wait for ALL tracked tasks
println!("All tasks finished");
}
}
Timeouts and Retries
超时与重试
#![allow(unused)]
fn main() {
use tokio::time::{timeout, sleep, Duration};
// Simple timeout
async fn with_timeout() -> Result<Response, Error> {
match timeout(Duration::from_secs(5), fetch_data()).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(e)) => Err(Error::Fetch(e)),
Err(_) => Err(Error::Timeout),
}
}
// Exponential backoff retry
async fn retry_with_backoff<F, Fut, T, E>(
max_attempts: u32,
base_delay_ms: u64,
operation: F,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut delay = Duration::from_millis(base_delay_ms);
for attempt in 1..=max_attempts {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
if attempt == max_attempts {
eprintln!("Final attempt {attempt} failed: {e}");
return Err(e);
}
eprintln!("Attempt {attempt} failed: {e}, retrying in {delay:?}");
sleep(delay).await;
delay *= 2; // Exponential backoff
}
}
}
unreachable!()
}
// Usage:
// let result = retry_with_backoff(3, 100, || async {
// reqwest::get("https://api.example.com/data").await
// }).await?;
}
Production tip — add jitter: pure exponential backoff can make many clients retry in lockstep. Add random jitter so the retries spread out instead of hammering the service again at the same instant.
生产提示——加抖动:纯指数退避很容易让大量客户端按同样的时间点一起重试,形成新的尖峰。加一点随机抖动,能把这些重试时间打散。
Error Handling in Async Code
异步代码里的错误处理
Async code introduces a few extra complications: spawned tasks create error boundaries, timeout wrappers add another error layer, and ? behaves differently once results cross task boundaries.
异步代码的错误处理会多出几层麻烦:spawn 出去的任务天然形成错误边界,超时又会多套一层错误包装,而 ? 一旦跨过任务边界,展开方式也会变得不一样。
thiserror vs anyhow:thiserror 与 anyhow 的选择:
#![allow(unused)]
fn main() {
// thiserror: Define typed errors for libraries and public APIs
// Every variant is explicit — callers can match on specific errors
use thiserror::Error;
#[derive(Error, Debug)]
enum DiagError {
#[error("IPMI command failed: {0}")]
Ipmi(#[from] IpmiError),
#[error("Sensor {sensor} out of range: {value}°C (max {max}°C)")]
OverTemp { sensor: String, value: f64, max: f64 },
#[error("Operation timed out after {0:?}")]
Timeout(std::time::Duration),
#[error("Task panicked: {0}")]
TaskPanic(#[from] tokio::task::JoinError),
}
// anyhow: Quick error handling for applications and prototypes
// Wraps any error — no need to define types for every case
use anyhow::{Context, Result};
async fn run_diagnostics() -> Result<()> {
let config = load_config()
.await
.context("Failed to load diagnostic config")?;
let result = run_gpu_test(&config)
.await
.context("GPU diagnostic failed")?;
Ok(())
}
// anyhow prints: "GPU diagnostic failed: IPMI command failed: timeout"
}
| Crate 工具 | Use When 适用场景 | Error Type 错误类型 | Matching 匹配方式 |
|---|---|---|---|
thiserror | Library code, public APIs 库代码、公开 API | enum MyError { ... } | match err { MyError::Timeout => ... } |
anyhow | Applications, CLI tools, scripts 应用、CLI、脚本 | anyhow::Error | err.downcast_ref::<MyError>() |
| Both together 两者结合 | Library exposes typed errors, app wraps them 库对外暴露强类型错误,应用层统一包裹 | Best of both 两边优点都拿 | Typed in library, erased in app 库层强类型,应用层无需细分 |
The double-? pattern with tokio::spawn:tokio::spawn 里的双重 ? 模式:
#![allow(unused)]
fn main() {
use thiserror::Error;
use tokio::task::JoinError;
#[derive(Error, Debug)]
enum AppError {
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("Task panicked: {0}")]
TaskPanic(#[from] JoinError),
}
async fn spawn_with_errors() -> Result<String, AppError> {
let handle = tokio::spawn(async {
let resp = reqwest::get("https://example.com").await?;
Ok::<_, reqwest::Error>(resp.text().await?)
});
// Double ?: First ? unwraps JoinError (task panic), second ? unwraps inner Result
let result = handle.await??;
Ok(result)
}
}
The error boundary problem:
错误边界问题:
#![allow(unused)]
fn main() {
// ❌ Error context is lost across spawn boundaries:
async fn bad_error_handling() -> Result<()> {
let handle = tokio::spawn(async {
some_fallible_work().await
});
let result = handle.await??;
Ok(())
}
// ✅ Add context at the spawn boundary:
async fn good_error_handling() -> Result<()> {
let handle = tokio::spawn(async {
some_fallible_work()
.await
.context("worker task failed")
});
let result = handle.await
.context("worker task panicked")??;
Ok(())
}
}
Timeout errors:
超时错误包装:
#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};
async fn with_timeout_context() -> Result<String, DiagError> {
let dur = Duration::from_secs(30);
match timeout(dur, fetch_sensor_data()).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(e)) => Err(e),
Err(_) => Err(DiagError::Timeout(dur)),
}
}
}
Tower: The Middleware Pattern
Tower:中间件模式
The Tower crate defines a composable Service trait. It is the backbone of middleware composition in the Rust async web ecosystem.
Tower crate 定义了一套可组合的 Service trait,它基本就是 Rust 异步 Web 生态里中间件组合的主骨架。
#![allow(unused)]
fn main() {
// Tower's core trait (simplified):
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}
}
Middleware wraps a Service to add logging, timeout control, rate limiting, auth, and other cross-cutting concerns without touching the handler’s inner logic.
中间件本质上就是把一个 Service 再包一层,把日志、超时、限流、鉴权之类横切逻辑加进去,而不去污染真正的业务处理器。
#![allow(unused)]
fn main() {
use tower::{ServiceBuilder, timeout::TimeoutLayer, limit::RateLimitLayer};
use std::time::Duration;
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(10))) // Outermost: timeout
.layer(RateLimitLayer::new(100, Duration::from_secs(1))) // Then: rate limit
.service(my_handler); // Innermost: your code
}
If ASP.NET middleware or Express middleware feels familiar, Tower is the same family of idea in Rust form.
如果已经熟悉 ASP.NET 中间件或者 Express 中间件,那 Tower 的思路基本就是同一家子,只是换成了 Rust 里的抽象写法。
Exercise: Graceful Shutdown with Worker Pool
练习:带工作池的优雅停机
🏋️ Exercise
🏋️ 练习
Challenge: Build a task processor with a channel-based queue, N workers, and Ctrl+C graceful shutdown. Workers should finish any in-flight work before leaving.
挑战:实现一个基于 channel 队列的任务处理器,包含 N 个 worker,并支持 Ctrl+C 优雅停机。要求 worker 在退出前把手头正在处理的工作完成。
🔑 Solution
🔑 参考答案
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};
struct WorkItem { id: u64, payload: String }
#[tokio::main]
async fn main() {
let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));
let mut handles = Vec::new();
for id in 0..4 {
let rx = work_rx.clone();
let mut shutdown = shutdown_rx.clone();
handles.push(tokio::spawn(async move {
loop {
let item = {
let mut rx = rx.lock().await;
tokio::select! {
item = rx.recv() => item,
_ = shutdown.changed() => {
if *shutdown.borrow() { None } else { continue }
}
}
};
match item {
Some(work) => {
println!("Worker {id}: processing {}", work.id);
sleep(Duration::from_millis(200)).await;
}
None => break,
}
}
}));
}
// Submit work
for i in 0..20 {
let _ = work_tx.send(WorkItem { id: i, payload: format!("task-{i}") }).await;
sleep(Duration::from_millis(50)).await;
}
// On Ctrl+C: signal shutdown, wait for workers
tokio::signal::ctrl_c().await.unwrap();
// NOTE: .unwrap() is used for brevity — handle errors in production.
shutdown_tx.send(true).unwrap();
for h in handles { let _ = h.await; }
println!("Shut down cleanly.");
}
Key Takeaways — Production Patterns
本章要点——生产环境模式
- Use a
watchchannel plusselect!for coordinated graceful shutdown
优雅停机最常见的做法是watchchannel 配合select!协同广播- Bounded channels provide backpressure by forcing senders to wait when the buffer is full
有界 channel 通过“缓冲满了就让发送方等待”来提供背压JoinSetandTaskTrackergive structured ways to跟踪、等待、收拢任务组JoinSet和TaskTracker提供了结构化的任务分组跟踪与收尾方式- Network operations should almost always have explicit timeouts
网络操作几乎都应该显式加上超时- Tower’s
Servicetrait is the standard middleware foundation in production Rust services
Tower 的Servicetrait 是生产级 Rust 服务里最常见的中间件基础抽象
See also: Ch 8 — Tokio Deep Dive for channels and sync primitives, Ch 12 — Common Pitfalls for cancellation hazards during shutdown.
继续阅读: 第 8 章——Tokio Deep Dive 会继续讲 channel 和同步原语,第 12 章——Common Pitfalls 会补上停机阶段容易踩到的取消陷阱。
Exercises §§ZH§§ 练习
Exercises
练习
Exercise 1: Async Echo Server
练习 1:异步 Echo 服务器
Build a TCP echo server that handles multiple clients concurrently.
实现一个 TCP echo 服务器,要求能够并发处理多个客户端。
Requirements:
要求:
- Listen on
127.0.0.1:8080
监听127.0.0.1:8080 - Accept connections and echo back each line
接收连接,并把每一行原样回写 - Handle client disconnections gracefully
优雅处理客户端断开 - Print a log when clients connect or disconnect
在客户端连接与断开时打印日志
🔑 Solution
🔑 参考答案
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Echo server listening on :8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("[{addr}] Connected");
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
println!("[{addr}] Disconnected");
break;
}
Ok(_) => {
print!("[{addr}] Echo: {line}");
if writer.write_all(line.as_bytes()).await.is_err() {
println!("[{addr}] Write error, disconnecting");
break;
}
}
Err(e) => {
eprintln!("[{addr}] Read error: {e}");
break;
}
}
}
});
}
}
Exercise 2: Concurrent URL Fetcher with Rate Limiting
练习 2:带并发限制的 URL 抓取器
Fetch a list of URLs concurrently, with at most 5 requests running at the same time.
并发抓取一组 URL,但同一时刻最多只能有 5 个请求在飞。
🔑 Solution
🔑 参考答案
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn fetch_urls(urls: Vec<String>) -> Vec<Result<String, String>> {
// buffer_unordered(5) ensures at most 5 futures are polled
// concurrently — no separate Semaphore needed here.
let results: Vec<_> = stream::iter(urls)
.map(|url| {
async move {
println!("Fetching: {url}");
match reqwest::get(&url).await {
Ok(resp) => match resp.text().await {
Ok(body) => Ok(body),
Err(e) => Err(format!("{url}: {e}")),
},
Err(e) => Err(format!("{url}: {e}")),
}
}
})
.buffer_unordered(5) // ← This alone limits concurrency to 5
.collect()
.await;
results
}
// NOTE: Use Semaphore when you need to limit concurrency across
// independently spawned tasks (tokio::spawn). Use buffer_unordered
// when processing a stream. Don't combine both for the same limit.
}
Why this works: buffer_unordered(5) itself is already the concurrency limiter. It only allows five in-flight futures at a time while still collecting results as soon as they finish.
为什么这样就够了: buffer_unordered(5) 本身就是并发闸门。它只允许五个 future 同时处于进行中状态,并且谁先完成就先把结果收回来。
Exercise 3: Graceful Shutdown with Worker Pool
练习 3:带优雅退出的工作池
Build a task processor with these properties:
实现一个任务处理器,要求具备下面这些特性:
- A channel-based work queue
基于 channel 的工作队列 - N worker tasks consuming from the queue
N 个 worker 任务从队列中消费任务 - Graceful shutdown on Ctrl+C: stop accepting new work and finish in-flight work
按下 Ctrl+C 后优雅退出:停止接收新任务,但把已经在处理中的任务收完
🔑 Solution
🔑 参考答案
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};
struct WorkItem {
id: u64,
payload: String,
}
#[tokio::main]
async fn main() {
let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Spawn 4 workers
let mut worker_handles = Vec::new();
let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));
for id in 0..4 {
let rx = work_rx.clone();
let mut shutdown = shutdown_rx.clone();
let handle = tokio::spawn(async move {
loop {
let item = {
let mut rx = rx.lock().await;
tokio::select! {
item = rx.recv() => item,
_ = shutdown.changed() => {
if *shutdown.borrow() { None } else { continue }
}
}
};
match item {
Some(work) => {
println!("Worker {id}: processing item {}", work.id);
sleep(Duration::from_millis(200)).await; // Simulate work
println!("Worker {id}: done with item {}", work.id);
}
None => {
println!("Worker {id}: channel closed, exiting");
break;
}
}
}
});
worker_handles.push(handle);
}
// Producer: submit some work
let producer = tokio::spawn(async move {
for i in 0..20 {
let _ = work_tx.send(WorkItem {
id: i,
payload: format!("task-{i}"),
}).await;
sleep(Duration::from_millis(50)).await;
}
});
// Wait for Ctrl+C
tokio::signal::ctrl_c().await.unwrap();
println!("\nShutdown signal received!");
shutdown_tx.send(true).unwrap();
producer.abort(); // Cancel the producer task
// Wait for workers to finish
for handle in worker_handles {
let _ = handle.await;
}
println!("All workers shut down. Goodbye!");
}
Key point: graceful shutdown is not “kill everything immediately”. The important part is to stop producing new work, broadcast shutdown intent, and allow existing worker tasks to reach a clean stopping point.
关键点: 优雅退出不是“一刀切全杀掉”,而是先停掉新任务来源,再广播关闭意图,同时让已经跑起来的 worker 有机会走到一个干净的结束点。
Exercise 4: Build a Simple Async Mutex from Scratch
练习 4:从零实现一个简单的异步 Mutex
Implement an async-aware mutex without using tokio::sync::Mutex.
在不使用 tokio::sync::Mutex 的前提下,实现一个能感知异步等待的 mutex。
Hint: Use a tokio::sync::mpsc channel with capacity 1 as a semaphore.
提示:可以把容量为 1 的 tokio::sync::mpsc channel 想成一个信号量。
🔑 Solution
🔑 参考答案
#![allow(unused)]
fn main() {
use std::cell::UnsafeCell;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub struct SimpleAsyncMutex<T> {
data: Arc<UnsafeCell<T>>,
semaphore: Arc<Semaphore>,
}
// SAFETY: Access to T is serialized by the semaphore (max 1 permit).
unsafe impl<T: Send> Send for SimpleAsyncMutex<T> {}
unsafe impl<T: Send> Sync for SimpleAsyncMutex<T> {}
pub struct SimpleGuard<T> {
data: Arc<UnsafeCell<T>>,
_permit: OwnedSemaphorePermit, // Dropped on guard drop → releases lock
}
impl<T> SimpleAsyncMutex<T> {
pub fn new(value: T) -> Self {
SimpleAsyncMutex {
data: Arc::new(UnsafeCell::new(value)),
semaphore: Arc::new(Semaphore::new(1)),
}
}
pub async fn lock(&self) -> SimpleGuard<T> {
let permit = self.semaphore.clone().acquire_owned().await.unwrap();
SimpleGuard {
data: self.data.clone(),
_permit: permit,
}
}
}
impl<T> std::ops::Deref for SimpleGuard<T> {
type Target = T;
fn deref(&self) -> &T {
// SAFETY: We hold the only semaphore permit, so no other
// SimpleGuard exists → exclusive access is guaranteed.
unsafe { &*self.data.get() }
}
}
impl<T> std::ops::DerefMut for SimpleGuard<T> {
fn deref_mut(&mut self) -> &mut T {
// SAFETY: Same reasoning — single permit guarantees exclusivity.
unsafe { &mut *self.data.get() }
}
}
// When SimpleGuard is dropped, _permit is dropped,
// which releases the semaphore permit — another lock() can proceed.
// Usage:
// let mutex = SimpleAsyncMutex::new(vec![1, 2, 3]);
// {
// let mut guard = mutex.lock().await;
// guard.push(4);
// } // permit released here
}
Key takeaway: async mutexes are usually built on semaphores. The semaphore is what provides “wait asynchronously until the lock becomes available”.
核心收获: 异步 mutex 底层通常就是信号量。真正提供“等锁可用时挂起任务而不是阻塞线程”能力的,正是信号量这一层。
Why
UnsafeCelland notstd::sync::Mutex? A previous version of this exercise usedArc<Mutex<T>>and then tried to expose&T/&mut TthroughDeref. That fails because the references would borrow from a temporaryMutexGuardthat gets dropped immediately.UnsafeCellremoves that temporary guard layer, while semaphore-based serialization keeps theunsafesound.
为什么这里用UnsafeCell,而不是std::sync::Mutex? 之前一种更直觉的写法是Arc<Mutex<T>>再配合Deref/DerefMut暴露&T和&mut T。但那样不成立,因为引用会借自一个马上就被丢弃的临时MutexGuard。UnsafeCell去掉了这层临时 guard,而信号量串行化则保证了这段unsafe的合理性。
Exercise 5: Stream Pipeline
练习 5:Stream 处理流水线
Build a stream-based data pipeline that does the following:
实现一条基于 stream 的数据处理流水线,要求完成下面这些步骤:
- Generate numbers
1..=100
生成1..=100的数字 - Keep only even numbers
筛出偶数 - Square each value
把每个值平方 - Process 10 items concurrently, using sleep to simulate async work
每次并发处理 10 个,休眠可用于模拟异步工作 - Collect the results
收集最终结果
🔑 Solution
🔑 参考答案
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let results: Vec<u64> = stream::iter(1u64..=100)
// Step 2: Filter evens
.filter(|x| futures::future::ready(x % 2 == 0))
// Step 3: Square each
.map(|x| x * x)
// Step 4: Process concurrently (simulate async work)
.map(|x| async move {
sleep(Duration::from_millis(50)).await;
println!("Processed: {x}");
x
})
.buffer_unordered(10) // 10 concurrent
// Step 5: Collect
.collect()
.await;
println!("Got {} results", results.len());
println!("Sum: {}", results.iter().sum::<u64>());
}
This exercise is useful because it compresses several common stream operations into one place: filtering, mapping, async fan-out, and collection.
这个练习很值,因为它把 stream 里最常见的几类操作一次串齐了:过滤、映射、异步扇出,以及收集结果。
Exercise 6: Implement Select with Timeout
练习 6:实现带超时的 Select
Without using tokio::select! or tokio::time::timeout, implement a function that races a future against a deadline and returns Either::Left(result) or Either::Right(()) when time runs out.
在不使用 tokio::select! 和 tokio::time::timeout 的前提下,实现一个函数,让某个 future 和截止时间赛跑,并在成功时返回 Either::Left(result),超时时返回 Either::Right(())。
Hint: Build it on top of the Select combinator and TimerFuture from Chapter 6.
提示:可以直接建立在第 6 章的 Select 组合子和 TimerFuture 之上。
🔑 Solution
🔑 参考答案
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub enum Either<A, B> {
Left(A),
Right(B),
}
pub struct Timeout<F> {
future: F,
timer: TimerFuture, // From Chapter 6
}
impl<F: Future + Unpin> Timeout<F> {
pub fn new(future: F, duration: Duration) -> Self {
Timeout {
future,
timer: TimerFuture::new(duration),
}
}
}
impl<F: Future + Unpin> Future for Timeout<F> {
type Output = Either<F::Output, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the main future is done
if let Poll::Ready(val) = Pin::new(&mut self.future).poll(cx) {
return Poll::Ready(Either::Left(val));
}
// Check if the timer expired
if let Poll::Ready(()) = Pin::new(&mut self.timer).poll(cx) {
return Poll::Ready(Either::Right(()));
}
Poll::Pending
}
}
// Usage:
// match Timeout::new(fetch_data(), Duration::from_secs(5)).await {
// Either::Left(data) => println!("Got data: {data}"),
// Either::Right(()) => println!("Timed out!"),
// }
Key takeaway: select and timeout are conceptually simple. They are both just “poll two futures and see which one finishes first”. A surprising amount of async infrastructure is built from that one primitive idea.
核心收获: select 和 timeout 在概念上其实很朴素,本质都是“把两个 future 一起 poll,看谁先结束”。异步生态里一大堆看起来高级的能力,往下拆最后就是这个原语。
Summary and Reference Card
总结与参考卡片
Quick Reference Card
快速参考卡片
Async Mental Model
Async 心智模型
┌─────────────────────────────────────────────────────┐
│ async fn → State Machine (enum) → impl Future │
│ .await → poll() the inner future │
│ executor → loop { poll(); sleep_until_woken(); } │
│ waker → "hey executor, poll me again" │
│ Pin → "promise I won't move in memory" │
└─────────────────────────────────────────────────────┘
可以把整套 async 先记成这一句话:async fn 会被编译成状态机,.await 会去 poll 内层 future,executor 负责不断轮询并在被唤醒后继续推进,waker 用来通知执行器“我又能继续了”,而 Pin 保证状态机不会在内存里乱挪位置。
Common Patterns Cheat Sheet
常见模式速查表
| Goal | Use |
|---|---|
| Run two futures concurrently 让两个 future 并发执行 | tokio::join!(a, b) |
| Race two futures 让两个 future 竞速 | tokio::select! { ... } |
| Spawn a background task 启动后台任务 | tokio::spawn(async { ... }) |
| Run blocking code in async 在 async 上下文中执行阻塞代码 | tokio::task::spawn_blocking(|| { ... }) |
| Limit concurrency 限制并发数 | Semaphore::new(N) |
| Collect many task results 收集大量任务结果 | JoinSet |
| Share state across tasks 在任务之间共享状态 | Arc<Mutex<T>> or channelsArc<Mutex<T>> 或 channel |
| Graceful shutdown 优雅停机 | watch::channel + select! |
| Process a stream N-at-a-time 按 N 个一组处理 stream | .buffer_unordered(N) |
| Timeout a future 给 future 设置超时 | tokio::time::timeout(dur, fut) |
| Retry with backoff 带退避的重试 | Custom combinator (see Ch. 13) 自定义组合器,见第 13 章 |
Pinning Quick Reference
Pinning 速查
| Situation | Use |
|---|---|
| Pin a future on the heap 把 future 固定在堆上 | Box::pin(fut) |
| Pin a future on the stack 把 future 固定在栈上 | tokio::pin!(fut) |
Pin an Unpin type固定一个 Unpin 类型 | Pin::new(&mut val) — safe, freePin::new(&mut val),安全且没有额外成本 |
| Return a pinned trait object 返回一个被 pin 的 trait object | -> Pin<Box<dyn Future<Output = T> + Send>> |
Channel Selection Guide
Channel 选型指南
| Channel | Producers | Consumers | Values | Use When |
|---|---|---|---|---|
mpsc | N | 1 | Stream | Work queues, event buses 工作队列、事件总线 |
oneshot | 1 | 1 | Single | Request/response, completion notification 请求响应、完成通知 |
broadcast | N | N | All recv all | Fan-out notifications, shutdown signals 扇出通知、停机信号 |
watch | 1 | N | Latest only | Config updates, health status 配置更新、健康状态 |
Mutex Selection Guide
Mutex 选型指南
| Mutex | Use When |
|---|---|
std::sync::Mutex | Lock is held briefly, never across .await锁持有时间很短,而且绝对不会跨 .await。 |
tokio::sync::Mutex | Lock must be held across .await锁需要跨 .await 持有。 |
parking_lot::Mutex | High contention, no .await, need performance竞争激烈、没有 .await,并且特别看重性能。 |
tokio::sync::RwLock | Many readers, few writers, locks cross .await读多写少,而且锁要跨 .await。 |
Decision Quick Reference
决策速查
Need concurrency?
├── I/O-bound → async/await
├── CPU-bound → rayon / std::thread
└── Mixed → spawn_blocking for CPU parts
Choosing runtime?
├── Server app → tokio
├── Library → runtime-agnostic (futures crate)
├── Embedded → embassy
└── Minimal → smol
Need concurrent futures?
├── Can be 'static + Send → tokio::spawn
├── Can be 'static + !Send → LocalSet
├── Can't be 'static → FuturesUnordered
└── Need to track/abort → JoinSet
如果只是为了快速判断,先按这个顺序想:先分清是 I/O 密集还是 CPU 密集,再决定运行时,最后再看 future 的生命周期和 Send 约束。
Common Error Messages and Fixes
常见报错与修复思路
| Error | Cause | Fix |
|---|---|---|
future is not Send | Holding !Send type across .await在 .await 之前持有了 !Send 类型。 | Scope the value so it’s dropped before .await, or use current_thread runtime缩小作用域,让它在 .await 之前被释放,或者改用 current_thread 运行时。 |
borrowed value does not live long enough in spawn | tokio::spawn requires 'statictokio::spawn 要求 'static 生命周期。 | Use Arc, clone(), or FuturesUnordered使用 Arc、clone(),或者改用 FuturesUnordered。 |
the trait Future is not implemented for () | Missing .await漏写了 .await。 | Add .await to the async call给异步调用补上 .await。 |
cannot borrow as mutable in poll | Self-referential borrow 发生了自引用借用问题。 | Use Pin<&mut Self> correctly (see Ch. 4)正确使用 Pin<&mut Self>,详见第 4 章。 |
| Program hangs silently | Forgot to call waker.wake()忘了调用 waker.wake()。 | Ensure every Pending path registers and triggers the waker确保每条返回 Pending 的分支都注册并触发了 waker。 |
Further Reading
延伸阅读
| Resource | Why |
|---|---|
| Tokio Tutorial | Official hands-on guide — excellent for first projects 官方动手教程,非常适合第一个项目。 |
| Async Book (official) | Covers Future, Pin, Stream at the language level从语言层面讲清 Future、Pin 和 Stream。 |
| Jon Gjengset — Crust of Rust: async/await | 2-hour deep dive into internals with live coding 配合现场编码,深入讲解 async/await 内部机制。 |
| Alice Ryhl — Actors with Tokio | Production architecture pattern for stateful services 面向有状态服务的生产架构模式。 |
| Without Boats — Pin, Unpin, and why Rust needs them | The original motivation from the language designer 语言设计者给出的原始动机说明。 |
| Tokio mini-Redis | Complete async Rust project — study-quality production code 一个完整的 async Rust 项目,学习价值很高。 |
| Tower documentation | Middleware/service architecture used by axum, tonic, hyper axum、tonic、hyper 等框架采用的中间件与服务架构。 |
End of Async Rust Training Guide
Async Rust 训练指南到此结束。
Capstone Project: Async Chat Server
综合项目:异步聊天室服务器
This project integrates patterns from across the book into a single, production-style application. You’ll build a multi-room async chat server using tokio, channels, streams, graceful shutdown, and proper error handling.
这个项目会把整本书里前面讲过的模式揉进一个更接近生产风格的应用里。目标是用 Tokio、channel、stream、优雅停机和规范错误处理,搭一个支持多房间的异步聊天服务器。
Estimated time: 4–6 hours | Difficulty: ★★★
预估耗时: 4 到 6 小时 | 难度: ★★★
What you’ll practice:
这一章会练到的内容:
tokio::spawnand the'staticrequirement (Ch 8)tokio::spawn以及它为什么经常要求'static(第 8 章)- Channels:
mpscfor messages,broadcastfor rooms,watchfor shutdown (Ch 8)
channel 组合:mpsc传消息,broadcast做房间广播,watch传停机信号(第 8 章)- Streams: reading lines from TCP connections (Ch 11)
stream 思维:从 TCP 连接里持续读取行数据(第 11 章)- Common pitfalls: cancellation safety,
MutexGuardacross.await(Ch 12)
常见坑:取消安全、MutexGuard跨.await等问题(第 12 章)- Production patterns: graceful shutdown, backpressure (Ch 13)
生产模式:优雅停机、背压控制(第 13 章)- Async traits for pluggable backends (Ch 10)
可插拔后端所需的 async trait 思路(第 10 章)
The Problem
问题定义
Build a TCP chat server where:
要实现一个 TCP 聊天服务器,满足下面这些要求:
- Clients connect via TCP and join named rooms
1. 客户端 通过 TCP 连接,并加入具名房间。 - Messages are broadcast to all clients in the same room
2. 消息 会广播给同一房间里的所有客户端。 - Commands:
/join <room>,/nick <name>,/rooms,/quit
3. 支持命令:/join <room>、/nick <name>、/rooms、/quit。 - The server shuts down gracefully on Ctrl+C — finishing in-flight messages
4. 按下 Ctrl+C 时,服务器要能优雅停机,把飞行中的消息尽量收完再退出。
graph LR
C1["Client 1<br/>(Alice)"] -->|TCP| SERVER["Chat Server"]
C2["Client 2<br/>(Bob)"] -->|TCP| SERVER
C3["Client 3<br/>(Carol)"] -->|TCP| SERVER
SERVER --> R1["#general<br/>broadcast channel"]
SERVER --> R2["#rust<br/>broadcast channel"]
R1 -->|msg| C1
R1 -->|msg| C2
R2 -->|msg| C3
CTRL["Ctrl+C"] -->|watch| SERVER
style SERVER fill:#e8f4f8,stroke:#2980b9,color:#000
style R1 fill:#d4efdf,stroke:#27ae60,color:#000
style R2 fill:#d4efdf,stroke:#27ae60,color:#000
style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000
这张图背后的核心思路很简单:TCP 连接负责“谁连进来了”,房间广播负责“消息往哪发”,停机信号负责“什么时候开始收摊”。
把这几种责任拆开之后,整个系统就会清楚很多,不容易在一个大循环里全搅成一锅。
Step 1: Basic TCP Accept Loop
步骤 1:先搭一个最小 TCP 接收循环
Start with a server that accepts connections and echoes lines back:
先从一个最小版本开始:接受连接,然后把客户端发来的每一行原样回显回去。
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Chat server listening on :8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("[{addr}] Connected");
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let _ = writer.write_all(line.as_bytes()).await;
}
}
}
println!("[{addr}] Disconnected");
});
}
}
Your job: Verify this compiles and works with telnet localhost 8080.
练习目标: 先确认这段代码能编译,并且能用 telnet localhost 8080 连上测试。
这一小步虽然看着朴素,但很重要。先确认 accept loop、into_split()、逐行读取和独立任务派发都没问题,后面再往里加房间和命令,排查起来才不会一团乱。
别一上来就把所有功能全堆进去,那样出问题时很容易连是哪一层坏了都看不出来。
Step 2: Room State with Broadcast Channels
步骤 2:用广播 channel 管理房间状态
Each room is a broadcast::Sender. All clients in a room subscribe to receive messages.
这里的设计是:每个房间对应一个 broadcast::Sender,房间里的客户端通过订阅它来接收消息。
#![allow(unused)]
fn main() {
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
type RoomMap = Arc<RwLock<HashMap<String, broadcast::Sender<String>>>>;
fn get_or_create_room(rooms: &mut HashMap<String, broadcast::Sender<String>>, name: &str) -> broadcast::Sender<String> {
rooms.entry(name.to_string())
.or_insert_with(|| {
let (tx, _) = broadcast::channel(100); // 100-message buffer
tx
})
.clone()
}
}
Your job: Implement room state so that:
练习目标: 把房间状态补完整,让它满足下面这些要求:
- Clients start in
#general
客户端默认进入#general。 /join <room>switches rooms (unsubscribe from old, subscribe to new)/join <room>可以切房间,需要从旧房间退订,再订阅新房间。- Messages are broadcast to all clients in the sender’s current room
普通消息要广播给发送者当前房间里的所有客户端。
💡 Hint — Client task structure 💡 提示:客户端任务结构
Each client task needs two concurrent loops:
每个客户端任务本质上要同时处理两件事:
- Read from TCP → parse commands or broadcast to room
1. 从 TCP 读输入,然后解析命令或往房间广播。 - Read from broadcast receiver → write to TCP
2. 从房间广播接收器读消息,再写回 TCP。
Use tokio::select! to run both:
用 tokio::select! 把这两条流并起来:
#![allow(unused)]
fn main() {
loop {
tokio::select! {
// Client sent us a line
result = reader.read_line(&mut line) => {
match result {
Ok(0) | Err(_) => break,
Ok(_) => {
// Parse command or broadcast message
}
}
}
// Room broadcast received
result = room_rx.recv() => {
match result {
Ok(msg) => {
let _ = writer.write_all(msg.as_bytes()).await;
}
Err(_) => break,
}
}
}
}
}
这里其实已经能看出 async 的味道了:单个客户端任务里,不是写两个线程,也不是先读完再写,而是把两条异步事件源放进同一个 select! 里竞争。
谁先来事件就先处理谁,这种写法对聊天室、代理、网关、推送服务都特别常见。
Step 3: Commands
步骤 3:实现命令协议
Implement the command protocol:
接下来把命令系统补上:
| Command 命令 | Action 动作 |
|---|---|
/join <room> | Leave current room, join new room, announce in both 离开当前房间,加入新房间,并在两边做提示广播 |
/nick <name> | Change display name 修改显示昵称 |
/rooms | List all active rooms and member counts 列出所有活跃房间及成员数 |
/quit | Disconnect gracefully 优雅断开连接 |
| Anything else | Broadcast as a chat message 其他普通输入都当成聊天消息广播 |
Your job: Parse commands from the input line. For /rooms, you’ll need to read from the RoomMap — use RwLock::read() to avoid blocking other clients.
练习目标: 从输入行里解析命令。处理 /rooms 时需要读取 RoomMap,这里用 RwLock::read(),避免把其他客户端也给堵住。
命令系统是把 demo 做成“真能互动”的第一步。也正是在这里,会开始出现共享状态读取、用户状态切换、广播通知这些更像真实系统的动作。
这一步写顺了,后面加更多命令就会自然很多。
Step 4: Graceful Shutdown
步骤 4:优雅停机
Add Ctrl+C handling so the server:
给服务器加上 Ctrl+C 处理逻辑,让它能做到:
- Stops accepting new connections
1. 停止接受新连接。 - Sends “Server shutting down…” to all rooms
2. 向所有房间广播“服务器即将关闭”。 - Waits for in-flight messages to drain
3. 尽量把正在路上的消息处理完。 - Exits cleanly
4. 最终干净退出。
#![allow(unused)]
fn main() {
use tokio::sync::watch;
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// In the accept loop:
loop {
tokio::select! {
result = listener.accept() => {
let (socket, addr) = result?;
// spawn client task with shutdown_rx.clone()
}
_ = tokio::signal::ctrl_c() => {
println!("Shutdown signal received");
shutdown_tx.send(true)?;
break;
}
}
}
}
Your job: Add shutdown_rx.changed() to each client’s select! loop so clients exit when shutdown is signaled.
练习目标: 把 shutdown_rx.changed() 也接进每个客户端自己的 select! 循环里,这样收到停机信号后,客户端任务也能自己有序退出。
优雅停机这个点特别像样板活,但线上价值很大。没有它,服务一停就是硬切,消息半路丢了也没人管。
聊天室这种东西看着简单,一旦开始涉及关闭过程中的数据一致性,就已经很接近真实服务端系统了。
Step 5: Error Handling and Edge Cases
步骤 5:错误处理与边界情况
Production-harden the server:
接下来把服务器往生产可用方向再拧紧一点:
- Lagging receivers:
broadcast::recv()returnsRecvError::Lagged(n)if a slow client misses messages. Handle it gracefully (log + continue, don’t crash).
1. 慢消费者:如果客户端太慢,broadcast::recv()可能返回RecvError::Lagged(n)。这里应该优雅处理,打日志后继续,不要直接炸掉。 - Nickname validation: Reject empty or too-long nicknames.
2. 昵称校验:空昵称或过长昵称都该拒绝。 - Backpressure: The broadcast channel buffer is bounded (100). If a client can’t keep up, they get the
Laggederror.
3. 背压:广播缓冲区是有界的,大小 100。跟不上的客户端会收到Lagged错误。 - Timeout: Disconnect clients that are idle for >5 minutes.
4. 超时:超过 5 分钟没动静的客户端要断开。
#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};
// Wrap the read in a timeout:
match timeout(Duration::from_secs(300), reader.read_line(&mut line)).await {
Ok(Ok(0)) | Ok(Err(_)) | Err(_) => break, // EOF, error, or timeout
Ok(Ok(_)) => { /* process line */ }
}
}
这一步才是真正把 demo 和“像回事的服务”拉开差距的地方。
很多项目表面功能都能跑,但一遇到慢连接、超时、积压、异常输入就开始冒烟。边界情况处理得越早,后面越省心。
Step 6: Integration Test
步骤 6:集成测试
Write a test that starts the server, connects two clients, and verifies message delivery:
最后写一个集成测试,启动服务器、连接两个客户端,并验证消息确实能送达:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn two_clients_can_chat() {
// Start server in background
let server = tokio::spawn(run_server("127.0.0.1:0")); // Port 0 = OS picks
// Connect two clients
let mut client1 = TcpStream::connect(addr).await.unwrap();
let mut client2 = TcpStream::connect(addr).await.unwrap();
// Client 1 sends a message
client1.write_all(b"Hello from client 1\n").await.unwrap();
// Client 2 should receive it
let mut buf = vec![0u8; 1024];
let n = client2.read(&mut buf).await.unwrap();
let msg = String::from_utf8_lossy(&buf[..n]);
assert!(msg.contains("Hello from client 1"));
}
}
如果只靠手动 telnet 点一点,这个项目永远停留在“看着能跑”。把它写成集成测试,才算把聊天室真正送进可验证区。
尤其是并发系统,手动验证一次通过并不能说明问题,测试才是后面敢重构的底气。
Evaluation Criteria
评估标准
| Criterion 维度 | Target 目标 |
|---|---|
| Concurrency 并发性 | Multiple clients in multiple rooms, no blocking 多客户端、多房间,整体不被单点阻塞 |
| Correctness 正确性 | Messages only go to clients in the same room 消息只发给同一房间内的客户端 |
| Graceful shutdown 优雅停机 | Ctrl+C drains messages and exits cleanly Ctrl+C 后能尽量收完消息,再干净退出 |
| Error handling 错误处理 | Lagged receivers, disconnections, timeouts handled 慢消费者、断连、超时都要处理好 |
| Code organization 代码组织 | Clean separation: accept loop, client task, room state accept loop、客户端任务、房间状态边界清晰 |
| Testing 测试 | At least 2 integration tests 至少两条集成测试 |
Extension Ideas
扩展方向
Once the basic chat server works, try these enhancements:
基础聊天室跑通之后,可以继续往下加这些增强项:
- Persistent history: Store last N messages per room; replay to new joiners
1. 持久化历史:每个房间保留最近 N 条消息,新加入用户自动回放。 - WebSocket support: Accept both TCP and WebSocket clients using
tokio-tungstenite
2. WebSocket 支持:用tokio-tungstenite同时接入 TCP 和 WebSocket 客户端。 - Rate limiting: Use
tokio::time::Intervalto limit messages per client per second
3. 限流:通过tokio::time::Interval限制每个客户端每秒发消息数量。 - Metrics: Track connected clients, messages/sec, room count via
prometheuscrate
4. 指标监控:借助prometheus统计在线人数、消息吞吐、房间数量。 - TLS: Add
tokio-rustlsfor encrypted connections
5. TLS:用tokio-rustls给连接加密。
这一章其实就是一次完整的收官演练:把 async 基础、channel 模型、stream 读取、超时、广播、停机和测试全都揉在一起。
做完它之后,对 Tokio 生态里常见服务端写法会有非常扎实的直觉。