ch16-02-message-passing.md
commit 24e275d624fe85af7b5b6316e78f8bfbbcac23e7
一個(gè)日益流行的確保安全并發(fā)的方式是 消息傳遞(message passing),這里線程或 actor 通過(guò)發(fā)送包含數(shù)據(jù)的消息來(lái)相互溝通。這個(gè)思想來(lái)源于 Go 編程語(yǔ)言文檔中 的口號(hào):“不要通過(guò)共享內(nèi)存來(lái)通訊;而是通過(guò)通訊來(lái)共享內(nèi)存。”(“Do not communicate by sharing memory; instead, share memory by communicating.”)
Rust 中一個(gè)實(shí)現(xiàn)消息傳遞并發(fā)的主要工具是 信道(channel),Rust 標(biāo)準(zhǔn)庫(kù)提供了其實(shí)現(xiàn)的編程概念。你可以將其想象為一個(gè)水流的渠道,比如河流或小溪。如果你將諸如橡皮鴨或小船之類的東西放入其中,它們會(huì)順流而下到達(dá)下游。
編程中的信息渠道(信道)有兩部分組成,一個(gè)發(fā)送者(transmitter)和一個(gè)接收者(receiver)。發(fā)送者位于上游位置,在這里可以將橡皮鴨放入河中,接收者則位于下游,橡皮鴨最終會(huì)漂流至此。代碼中的一部分調(diào)用發(fā)送者的方法以及希望發(fā)送的數(shù)據(jù),另一部分則檢查接收端收到的消息。當(dāng)發(fā)送者或接收者任一被丟棄時(shí)可以認(rèn)為信道被 關(guān)閉(closed)了。
這里,我們將開(kāi)發(fā)一個(gè)程序,它會(huì)在一個(gè)線程生成值向信道發(fā)送,而在另一個(gè)線程會(huì)接收值并打印出來(lái)。這里會(huì)通過(guò)信道在線程間發(fā)送簡(jiǎn)單值來(lái)演示這個(gè)功能。一旦你熟悉了這項(xiàng)技術(shù),就能使用信道來(lái)實(shí)現(xiàn)聊天系統(tǒng),或利用很多線程進(jìn)行分布式計(jì)算并將部分計(jì)算結(jié)果發(fā)送給一個(gè)線程進(jìn)行聚合。
首先,在示例 16-6 中,創(chuàng)建了一個(gè)信道但沒(méi)有做任何事。注意這還不能編譯,因?yàn)?Rust 不知道我們想要在信道中發(fā)送什么類型:
文件名: src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
示例 16-6: 創(chuàng)建一個(gè)信道,并將其兩端賦值給 tx
和 rx
這里使用 mpsc::channel
函數(shù)創(chuàng)建一個(gè)新的信道;mpsc
是 多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者(multiple producer, single consumer)的縮寫(xiě)。簡(jiǎn)而言之,Rust 標(biāo)準(zhǔn)庫(kù)實(shí)現(xiàn)信道的方式意味著一個(gè)信道可以有多個(gè)產(chǎn)生值的 發(fā)送(sending)端,但只能有一個(gè)消費(fèi)這些值的 接收(receiving)端。想象一下多條小河小溪最終匯聚成大河:所有通過(guò)這些小河發(fā)出的東西最后都會(huì)來(lái)到下游的大河。目前我們以單個(gè)生產(chǎn)者開(kāi)始,但是當(dāng)示例可以工作后會(huì)增加多個(gè)生產(chǎn)者。
mpsc::channel
函數(shù)返回一個(gè)元組:第一個(gè)元素是發(fā)送端,而第二個(gè)元素是接收端。由于歷史原因,tx
和 rx
通常作為 發(fā)送者(transmitter)和 接收者(receiver)的縮寫(xiě),所以這就是我們將用來(lái)綁定這兩端變量的名字。這里使用了一個(gè) let
語(yǔ)句和模式來(lái)解構(gòu)了此元組;第十八章會(huì)討論 let
語(yǔ)句中的模式和解構(gòu)。如此使用 let
語(yǔ)句是一個(gè)方便提取 mpsc::channel
返回的元組中一部分的手段。
讓我們將發(fā)送端移動(dòng)到一個(gè)新建線程中并發(fā)送一個(gè)字符串,這樣新建線程就可以和主線程通訊了,如示例 16-7 所示。這類似于在河的上游扔下一只橡皮鴨或從一個(gè)線程向另一個(gè)線程發(fā)送聊天信息:
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
示例 16-7: 將 tx
移動(dòng)到一個(gè)新建的線程中并發(fā)送 “hi”
這里再次使用 thread::spawn
來(lái)創(chuàng)建一個(gè)新線程并使用 move
將 tx
移動(dòng)到閉包中這樣新建線程就擁有 tx
了。新建線程需要擁有信道的發(fā)送端以便能向信道發(fā)送消息。
信道的發(fā)送端有一個(gè) send
方法用來(lái)獲取需要放入信道的值。send
方法返回一個(gè) Result<T, E>
類型,所以如果接收端已經(jīng)被丟棄了,將沒(méi)有發(fā)送值的目標(biāo),所以發(fā)送操作會(huì)返回錯(cuò)誤。在這個(gè)例子中,出錯(cuò)的時(shí)候調(diào)用 unwrap
產(chǎn)生 panic。不過(guò)對(duì)于一個(gè)真實(shí)程序,需要合理地處理它:回到第九章復(fù)習(xí)正確處理錯(cuò)誤的策略。
在示例 16-8 中,我們?cè)谥骶€程中從信道的接收端獲取值。這類似于在河的下游撈起橡皮鴨或接收聊天信息:
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
示例 16-8: 在主線程中接收并打印內(nèi)容 “hi”
信道的接收端有兩個(gè)有用的方法:recv
和 try_recv
。這里,我們使用了 recv
,它是 receive 的縮寫(xiě)。這個(gè)方法會(huì)阻塞主線程執(zhí)行直到從信道中接收一個(gè)值。一旦發(fā)送了一個(gè)值,recv
會(huì)在一個(gè) Result<T, E>
中返回它。當(dāng)信道發(fā)送端關(guān)閉,recv
會(huì)返回一個(gè)錯(cuò)誤表明不會(huì)再有新的值到來(lái)了。
try_recv
不會(huì)阻塞,相反它立刻返回一個(gè) Result<T, E>
:Ok
值包含可用的信息,而 Err
值代表此時(shí)沒(méi)有任何消息。如果線程在等待消息過(guò)程中還有其他工作時(shí)使用 try_recv
很有用:可以編寫(xiě)一個(gè)循環(huán)來(lái)頻繁調(diào)用 try_recv
,在有可用消息時(shí)進(jìn)行處理,其余時(shí)候則處理一會(huì)其他工作直到再次檢查。
出于簡(jiǎn)單的考慮,這個(gè)例子使用了 recv
;主線程中除了等待消息之外沒(méi)有任何其他工作,所以阻塞主線程是合適的。
如果運(yùn)行示例 16-8 中的代碼,我們將會(huì)看到主線程打印出這個(gè)值:
Got: hi
完美!
所有權(quán)規(guī)則在消息傳遞中扮演了重要角色,其有助于我們編寫(xiě)安全的并發(fā)代碼。防止并發(fā)編程中的錯(cuò)誤是在 Rust 程序中考慮所有權(quán)的一大優(yōu)勢(shì)。現(xiàn)在讓我們做一個(gè)試驗(yàn)來(lái)看看信道與所有權(quán)如何一同協(xié)作以避免產(chǎn)生問(wèn)題:我們將嘗試在新建線程中的信道中發(fā)送完 val
值 之后 再使用它。嘗試編譯示例 16-9 中的代碼并看看為何這是不允許的:
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
示例 16-9: 在我們已經(jīng)發(fā)送到信道中后,嘗試使用 val
引用
這里嘗試在通過(guò) tx.send
發(fā)送 val
到信道中之后將其打印出來(lái)。允許這么做是一個(gè)壞主意:一旦將值發(fā)送到另一個(gè)線程后,那個(gè)線程可能會(huì)在我們?cè)俅问褂盟熬蛯⑵湫薷幕蛘邅G棄。其他線程對(duì)值可能的修改會(huì)由于不一致或不存在的數(shù)據(jù)而導(dǎo)致錯(cuò)誤或意外的結(jié)果。然而,嘗試編譯示例 16-9 的代碼時(shí),Rust 會(huì)給出一個(gè)錯(cuò)誤:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:31
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {}", val);
| ^^^ value borrowed here after move
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error
我們的并發(fā)錯(cuò)誤會(huì)造成一個(gè)編譯時(shí)錯(cuò)誤。send
函數(shù)獲取其參數(shù)的所有權(quán)并移動(dòng)這個(gè)值歸接收者所有。這可以防止在發(fā)送后再次意外地使用這個(gè)值;所有權(quán)系統(tǒng)檢查一切是否合乎規(guī)則。
示例 16-8 中的代碼可以編譯和運(yùn)行,不過(guò)它并沒(méi)有明確的告訴我們兩個(gè)獨(dú)立的線程通過(guò)信道相互通訊。示例 16-10 則有一些改進(jìn)會(huì)證明示例 16-8 中的代碼是并發(fā)執(zhí)行的:新建線程現(xiàn)在會(huì)發(fā)送多個(gè)消息并在每個(gè)消息之間暫停一秒鐘。
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
示例 16-10: 發(fā)送多個(gè)消息,并在每次發(fā)送后暫停一段時(shí)間
這一次,在新建線程中有一個(gè)字符串 vector 希望發(fā)送到主線程。我們遍歷他們,單獨(dú)的發(fā)送每一個(gè)字符串并通過(guò)一個(gè) Duration
值調(diào)用 thread::sleep
函數(shù)來(lái)暫停一秒。
在主線程中,不再顯式調(diào)用 recv
函數(shù):而是將 rx
當(dāng)作一個(gè)迭代器。對(duì)于每一個(gè)接收到的值,我們將其打印出來(lái)。當(dāng)信道被關(guān)閉時(shí),迭代器也將結(jié)束。
當(dāng)運(yùn)行示例 16-10 中的代碼時(shí),將看到如下輸出,每一行都會(huì)暫停一秒:
Got: hi
Got: from
Got: the
Got: thread
因?yàn)橹骶€程中的 for
循環(huán)里并沒(méi)有任何暫停或等待的代碼,所以可以說(shuō)主線程是在等待從新建線程中接收值。
之前我們提到了mpsc
是 multiple producer, single consumer 的縮寫(xiě)。可以運(yùn)用 mpsc
來(lái)擴(kuò)展示例 16-10 中的代碼來(lái)創(chuàng)建向同一接收者發(fā)送值的多個(gè)線程。這可以通過(guò)克隆信道的發(fā)送端來(lái)做到,如示例 16-11 所示:
文件名: src/main.rs
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
// --snip--
示例 16-11: 從多個(gè)生產(chǎn)者發(fā)送多個(gè)消息
這一次,在創(chuàng)建新線程之前,我們對(duì)信道的發(fā)送端調(diào)用了 clone
方法。這會(huì)給我們一個(gè)可以傳遞給第一個(gè)新建線程的發(fā)送端句柄。我們會(huì)將原始的信道發(fā)送端傳遞給第二個(gè)新建線程。這樣就會(huì)有兩個(gè)線程,每個(gè)線程將向信道的接收端發(fā)送不同的消息。
如果運(yùn)行這些代碼,你 可能 會(huì)看到這樣的輸出:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
雖然你可能會(huì)看到這些值以不同的順序出現(xiàn);這依賴于你的系統(tǒng)。這也就是并發(fā)既有趣又困難的原因。如果通過(guò) thread::sleep
做實(shí)驗(yàn),在不同的線程中提供不同的值,就會(huì)發(fā)現(xiàn)他們的運(yùn)行更加不確定,且每次都會(huì)產(chǎn)生不同的輸出。
現(xiàn)在我們見(jiàn)識(shí)過(guò)了信道如何工作,再看看另一種不同的并發(fā)方式吧。
更多建議: