Rust concurrency
Rust 并发
What you’ll learn: Rust’s concurrency model, including threads,
Send/Syncmarker traits,Mutex<T>、Arc<T>、channels and the way the compiler prevents data races at compile time. The key theme is that Rust charges for synchronization only when the code actually needs it.
本章将学到什么: Rust 的并发模型,包括线程、Send/Sync标记 trait、Mutex<T>、Arc<T>、channel,以及编译器如何在编译期阻止数据竞争。核心主题是:只有真正需要同步的时候,Rust 才会让代码付出对应成本。
- Rust has built-in support for concurrency, similar in spirit to C++
std::thread.
Rust 对并发有原生支持,整体气质上和 C++ 的std::thread是同一类工具。- The major difference is that Rust rejects many unsafe sharing patterns at compile time through
SendandSync.
最大的差异在于:Rust 会借助Send和Sync在编译期直接拒绝很多危险共享模式。 - In C++, sharing a
std::vectoracross threads without synchronization compiles and becomes undefined behavior at runtime. In Rust, the same shape of code simply does not type-check.
在 C++ 里,不加同步就把std::vector跨线程共享,代码照样能编,出事全靠运行时;Rust 则会在类型检查阶段直接拦住。 Mutex<T>in Rust wraps the protected data itself, so you cannot even access the value without going through the lock guard.
Rust 的Mutex<T>不是光包一把锁,而是连数据本体一起包起来,想碰数据就必须先拿到锁 guard。
- The major difference is that Rust rejects many unsafe sharing patterns at compile time through
Spawning threads
创建线程
thread::spawn() launches a new thread and runs a closure on it in parallel.thread::spawn() 会拉起一个新线程,并在这个线程里并行执行闭包。
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 0..10 {
println!("Count in thread: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 0..5 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(5));
}
handle.join().unwrap(); // The handle.join() ensures that the spawned thread exits
}
Borrowing into scoped threads
把借用带进受限作用域线程
thread::scope()is useful when a spawned thread needs to borrow data from the surrounding stack frame.
如果线程需要借用外层栈上的数据,thread::scope()就特别有用。- It works because
thread::scope()waits until all inner threads finish before the borrowed data can go out of scope.
它之所以安全,是因为thread::scope()会在内部线程全部结束之后才退出,所以借用对象不会提前死亡。
use std::thread;
fn main() {
let a = [0, 1, 2];
thread::scope(|scope| {
scope.spawn(|| {
for x in &a {
println!("{x}");
}
});
});
}
Try removing thread::scope() and replacing this with a plain thread::spawn(). The compiler will immediately complain, because the borrow would no longer be guaranteed to outlive the spawned thread.
可以自己试着把 thread::scope() 去掉,改成普通 thread::spawn()。编译器会立刻报错,因为那样一来,借用值就不一定能活过新线程了。
Moving data into threads
把数据 move 进线程
movetransfers ownership into the thread closure. ForCopytypes such as[i32; 3], this behaves like a copy; for non-Copyvalues, the original binding is consumed.move会把所有权转移进线程闭包。对于[i32; 3]这种Copy类型,看起来更像复制;对于非Copy类型,原变量则会被真正消费掉。
use std::thread;
fn main() {
let mut a = [0, 1, 2];
let handle = thread::spawn(move || {
for x in a {
println!("{x}");
}
});
a[0] = 42; // Doesn't affect the copy sent to the thread
handle.join().unwrap();
}
Sharing read-only data with Arc<T>
用 Arc<T> 共享只读数据
Arc<T>is the standard way to share read-only ownership across threads.Arc<T>是跨线程共享只读所有权的标准工具。Arcmeans Atomic Reference Counted.Arc的全名就是 Atomic Reference Counted。Arc::clone()only increments the reference count; it does not deep-copy the underlying data.Arc::clone()只是把引用计数加一,不会深拷贝底层数据。
use std::sync::Arc;
use std::thread;
fn main() {
let a = Arc::new([0, 1, 2]);
let mut handles = Vec::new();
for i in 0..2 {
let arc = Arc::clone(&a);
handles.push(thread::spawn(move || {
println!("Thread: {i} {arc:?}");
}));
}
handles.into_iter().for_each(|h| h.join().unwrap());
}
Sharing mutable data with Arc<Mutex<T>>
用 Arc<Mutex<T>> 共享可变数据
Arc<T>plusMutex<T>is the standard combination for mutable shared state across threads.
跨线程共享可变状态时,最常见的标准组合就是Arc<T>配Mutex<T>。- The
MutexGuardreturned bylock()releases automatically when it goes out of scope.lock()返回的MutexGuard一离开作用域就会自动释放锁。 - This is still RAII, just applied to synchronization instead of only memory management.
这仍然是 RAII,只不过这次管理的不是堆内存,而是同步资源。
- The
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = Vec::new();
for _ in 0..5 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
// MutexGuard dropped here — lock released automatically
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Final count: {}", *counter.lock().unwrap());
// Output: Final count: 5
}
RwLock<T> for read-heavy sharing
读多写少时用 RwLock<T>
RwLock<T>allows many readers or one writer, which matches the same read/write lock pattern as C++std::shared_mutex.RwLock<T>允许多个读者同时存在,或者单个写者独占,这和 C++ 的std::shared_mutex是同一类模式。- Use it when reads vastly outnumber writes, such as configuration snapshots or caches.
当读取明显多于写入时,比如配置快照、缓存这类场景,RwLock往往更合适。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let config = Arc::new(RwLock::new(String::from("v1.0")));
let mut handles = Vec::new();
// Spawn 5 readers — all can run concurrently
for i in 0..5 {
let config = Arc::clone(&config);
handles.push(thread::spawn(move || {
let val = config.read().unwrap(); // Multiple readers OK
println!("Reader {i}: {val}");
}));
}
// One writer — blocks until all readers finish
{
let config = Arc::clone(&config);
handles.push(thread::spawn(move || {
let mut val = config.write().unwrap(); // Exclusive access
*val = String::from("v2.0");
println!("Writer: updated to {val}");
}));
}
for handle in handles {
handle.join().unwrap();
}
}
Mutex poisoning
Mutex 中毒
- If a thread panics while holding a
MutexorRwLock, the lock becomes poisoned.
如果线程在持有Mutex或RwLock时 panic,这把锁就会变成 poisoned 状态。- Later
lock()calls returnErr(PoisonError)because the protected data may now be inconsistent.
后续再去lock(),就会得到Err(PoisonError),因为受保护的数据可能已经处于不一致状态。 - If the caller knows the value is still usable, it can recover through
.into_inner().
如果调用方很确定数据其实还可以继续用,也能通过.into_inner()把它抢回来。 - C++
std::mutexhas no equivalent poisoning concept.
C++ 的std::mutex没有这层“中毒”概念。
- Later
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
let data2 = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut guard = data2.lock().unwrap();
guard.push(4);
panic!("oops!"); // Lock is now poisoned
});
let _ = handle.join(); // Thread panicked
match data.lock() {
Ok(guard) => println!("Data: {guard:?}"),
Err(poisoned) => {
println!("Lock was poisoned! Recovering...");
let guard = poisoned.into_inner();
println!("Recovered data: {guard:?}");
}
}
}
Atomics for simple shared state
简单共享状态时用原子类型
- For counters, flags, and other tiny shared states,
std::sync::atomicavoids the overhead of aMutex.
如果只是共享计数器、标志位之类很小的状态,std::sync::atomic往往比Mutex更合适。AtomicBool、AtomicU64、AtomicUsizeand friends are roughly analogous to C++std::atomic<T>.AtomicBool、AtomicU64、AtomicUsize这些类型,整体上可以类比 C++ 的std::atomic<T>。- The same memory ordering vocabulary appears here too:
Relaxed、Acquire、Release、SeqCst。
这里也会遇到同一套内存序词汇:Relaxed、Acquire、Release、SeqCst。
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
counter.fetch_add(1, Ordering::Relaxed);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter: {}", counter.load(Ordering::SeqCst));
// Output: Counter: 10000
}
| Primitive | When to use 什么时候用 | C++ equivalent |
|---|---|---|
Mutex<T> | General mutable shared state 通用可变共享状态 | std::mutex + manually associated data |
RwLock<T> | Read-heavy workloads 读多写少 | std::shared_mutex |
Atomic* | Counters, flags, lock-free basics 计数器、标志位、简单无锁场景 | std::atomic<T> |
Condvar | Wait for a condition to change 等待条件变化 | std::condition_variable |
Condvar for waiting on shared state
用 Condvar 等待共享状态变化
Condvarlets one thread sleep until another thread signals that some condition has changed.Condvar让一个线程睡下去,直到另一个线程发出“条件已经变化”的信号。- It is always paired with a
Mutex.
它总是和Mutex搭配使用。 - The usual pattern is: lock, check condition, wait if not ready, re-check after waking.
惯用套路就是:先加锁、检查条件、不满足就等待、醒来后重新检查。 - Just like in C++, spurious wakeups exist, so waiting should happen in a loop or through helpers such as
wait_while().
和 C++ 一样,这里也要考虑虚假唤醒,所以等待动作通常放在循环里,或者用wait_while()这种辅助方法。
- It is always paired with a
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
let worker = thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut ready = lock.lock().unwrap();
while !*ready {
ready = cvar.wait(ready).unwrap();
}
println!("Worker: condition met, proceeding!");
});
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*pair;
let mut ready = lock.lock().unwrap();
*ready = true;
cvar.notify_one();
}
worker.join().unwrap();
}
Condvar vs channels: Use
Condvarwhen several threads share mutable state and need to wait for a condition on that state, such as “buffer is no longer empty”. Use channels when the real problem is passing messages from one thread to another.Condvar和 channel 怎么选: 如果多个线程围着同一份共享状态转,只是在等它满足某个条件,比如“缓冲区不再为空”,那就用Condvar。如果核心需求是在线程之间传消息,那就用 channel。
Channels for message passing
用 channel 传递消息
- Rust channels connect
SenderandReceiverends and support the classicmpscpattern: multi-producer, single-consumer.
Rust 的 channel 由Sender和Receiver两端组成,支持经典的mpsc模式,也就是多生产者、单消费者。 - Both
send()andrecv()may block depending on the state of the channel.send()和recv()都可能根据 channel 状态发生阻塞。
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Received: {:?}", rx.recv());
println!("Received: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Received: {:?}", rx.recv());
}
Combining channels with threads
把 channel 和线程组合起来
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
for _ in 0..2 {
let tx2 = tx.clone();
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 0..10 {
tx2.send(format!("Message {i}")).unwrap();
println!("{thread_id:?}: sent Message {i}");
}
println!("{thread_id:?}: done");
});
}
drop(tx);
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Main: got {msg}");
}
}
Why Rust prevents data races: Send and Sync
Rust 为什么能防住数据竞争:Send 与 Sync
- Rust uses two marker traits to encode thread-safety properties directly into types.
Rust 用两个标记 trait,把线程安全性质直接编码进类型里。Sendmeans the value can be safely transferred to another thread.Send表示这个值可以安全地转移到别的线程。Syncmeans shared references to the value can be safely used from multiple threads.Sync表示这个值的共享引用可以安全地被多个线程同时使用。
- Most ordinary types are automatically
Send + Sync, but some notable types are not.
大多数普通类型都会自动实现Send + Sync,但也有一些典型例外。Rc<T>is neitherSendnorSync.Rc<T>两个都不是。Cell<T>andRefCell<T>are notSync.Cell<T>和RefCell<T>不是Sync。- Raw pointers are neither
SendnorSyncby default.
裸指针默认也不是Send或Sync。
- This is why
Arc<Mutex<T>>is often the thread-safe analogue ofRc<RefCell<T>>.
这也是为什么Arc<Mutex<T>>常常可以看成线程安全版的Rc<RefCell<T>>。
Intuition: think of values as toys.
Sendmeans “you can hand the toy to another child safely”.Syncmeans “multiple children can safely hold references to the toy at the same time”.Rc<T>fails both tests because its reference counter is not atomic.
直觉版理解: 可以把值想成玩具。Send的意思是“这玩具能安全地交给别的孩子”;Sync的意思是“多个孩子能不能同时拿着这玩具的引用一起玩”。Rc<T>两项都过不了,因为它的引用计数不是原子的。
Exercise: Multi-threaded word count
练习:多线程词频统计
🔴 Challenge — combines threads, Arc、Mutex and HashMap
🔴 挑战练习:把线程、Arc、Mutex 和 HashMap 组合起来。
- Given a
Vec<String>of text lines, spawn one thread per line and count the words in that line.
给定一组Vec<String>文本行,为每一行启动一个线程,并统计这一行里的单词。 - Use
Arc<Mutex<HashMap<String, usize>>>to collect the results.
用Arc<Mutex<HashMap<String, usize>>>汇总结果。 - Print the total word count across all lines.
最后打印所有文本行的总词数。 - Bonus: try a channel-based version instead of shared mutable state.
加分项:不用共享可变状态,改成基于 channel 的版本。
Solution 参考答案
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let lines = vec![
"the quick brown fox".to_string(),
"jumps over the lazy dog".to_string(),
"the fox is quick".to_string(),
];
let word_counts: Arc<Mutex<HashMap<String, usize>>> =
Arc::new(Mutex::new(HashMap::new()));
let mut handles = vec![];
for line in &lines {
let line = line.clone();
let counts = Arc::clone(&word_counts);
handles.push(thread::spawn(move || {
for word in line.split_whitespace() {
let mut map = counts.lock().unwrap();
*map.entry(word.to_lowercase()).or_insert(0) += 1;
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let counts = word_counts.lock().unwrap();
let total: usize = counts.values().sum();
println!("Word frequencies: {counts:#?}");
println!("Total words: {total}");
}
// Output (order may vary):
// Word frequencies: {
// "the": 3,
// "quick": 2,
// "brown": 1,
// "fox": 2,
// "jumps": 1,
// "over": 1,
// "lazy": 1,
// "dog": 1,
// "is": 1,
// }
// Total words: 13