Rust 使用消息傳遞在線程間傳送數(shù)據(jù)

2023-03-22 15:15 更新
ch16-02-message-passing.md
commit 24e275d624fe85af7b5b6316e78f8bfbbcac23e7

一個(gè)日益流行的確保安全并發(fā)的方式是 消息傳遞message passing),這里線程或 actor 通過(guò)發(fā)送包含數(shù)據(jù)的消息來(lái)相互溝通。這個(gè)思想來(lái)源于 Go 編程語(yǔ)言文檔中 的口號(hào):“不要通過(guò)共享內(nèi)存來(lái)通訊;而是通過(guò)通訊來(lái)共享內(nèi)存。”(“Do not communicate by sharing memory; instead, share memory by communicating.”)

Rust 中一個(gè)實(shí)現(xiàn)消息傳遞并發(fā)的主要工具是 信道channel),Rust 標(biāo)準(zhǔn)庫(kù)提供了其實(shí)現(xiàn)的編程概念。你可以將其想象為一個(gè)水流的渠道,比如河流或小溪。如果你將諸如橡皮鴨或小船之類的東西放入其中,它們會(huì)順流而下到達(dá)下游。

編程中的信息渠道(信道)有兩部分組成,一個(gè)發(fā)送者(transmitter)和一個(gè)接收者(receiver)。發(fā)送者位于上游位置,在這里可以將橡皮鴨放入河中,接收者則位于下游,橡皮鴨最終會(huì)漂流至此。代碼中的一部分調(diào)用發(fā)送者的方法以及希望發(fā)送的數(shù)據(jù),另一部分則檢查接收端收到的消息。當(dāng)發(fā)送者或接收者任一被丟棄時(shí)可以認(rèn)為信道被 關(guān)閉closed)了。

這里,我們將開(kāi)發(fā)一個(gè)程序,它會(huì)在一個(gè)線程生成值向信道發(fā)送,而在另一個(gè)線程會(huì)接收值并打印出來(lái)。這里會(huì)通過(guò)信道在線程間發(fā)送簡(jiǎn)單值來(lái)演示這個(gè)功能。一旦你熟悉了這項(xiàng)技術(shù),就能使用信道來(lái)實(shí)現(xiàn)聊天系統(tǒng),或利用很多線程進(jìn)行分布式計(jì)算并將部分計(jì)算結(jié)果發(fā)送給一個(gè)線程進(jìn)行聚合。

首先,在示例 16-6 中,創(chuàng)建了一個(gè)信道但沒(méi)有做任何事。注意這還不能編譯,因?yàn)?Rust 不知道我們想要在信道中發(fā)送什么類型:

文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

示例 16-6: 創(chuàng)建一個(gè)信道,并將其兩端賦值給 tx 和 rx

這里使用 mpsc::channel 函數(shù)創(chuàng)建一個(gè)新的信道;mpsc 是 多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者multiple producer, single consumer)的縮寫(xiě)。簡(jiǎn)而言之,Rust 標(biāo)準(zhǔn)庫(kù)實(shí)現(xiàn)信道的方式意味著一個(gè)信道可以有多個(gè)產(chǎn)生值的 發(fā)送sending)端,但只能有一個(gè)消費(fèi)這些值的 接收receiving)端。想象一下多條小河小溪最終匯聚成大河:所有通過(guò)這些小河發(fā)出的東西最后都會(huì)來(lái)到下游的大河。目前我們以單個(gè)生產(chǎn)者開(kāi)始,但是當(dāng)示例可以工作后會(huì)增加多個(gè)生產(chǎn)者。

mpsc::channel 函數(shù)返回一個(gè)元組:第一個(gè)元素是發(fā)送端,而第二個(gè)元素是接收端。由于歷史原因,tx 和 rx 通常作為 發(fā)送者transmitter)和 接收者receiver)的縮寫(xiě),所以這就是我們將用來(lái)綁定這兩端變量的名字。這里使用了一個(gè) let 語(yǔ)句和模式來(lái)解構(gòu)了此元組;第十八章會(huì)討論 let 語(yǔ)句中的模式和解構(gòu)。如此使用 let 語(yǔ)句是一個(gè)方便提取 mpsc::channel 返回的元組中一部分的手段。

讓我們將發(fā)送端移動(dòng)到一個(gè)新建線程中并發(fā)送一個(gè)字符串,這樣新建線程就可以和主線程通訊了,如示例 16-7 所示。這類似于在河的上游扔下一只橡皮鴨或從一個(gè)線程向另一個(gè)線程發(fā)送聊天信息:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

示例 16-7: 將 tx 移動(dòng)到一個(gè)新建的線程中并發(fā)送 “hi”

這里再次使用 thread::spawn 來(lái)創(chuàng)建一個(gè)新線程并使用 move 將 tx 移動(dòng)到閉包中這樣新建線程就擁有 tx 了。新建線程需要擁有信道的發(fā)送端以便能向信道發(fā)送消息。

信道的發(fā)送端有一個(gè) send 方法用來(lái)獲取需要放入信道的值。send 方法返回一個(gè) Result<T, E> 類型,所以如果接收端已經(jīng)被丟棄了,將沒(méi)有發(fā)送值的目標(biāo),所以發(fā)送操作會(huì)返回錯(cuò)誤。在這個(gè)例子中,出錯(cuò)的時(shí)候調(diào)用 unwrap 產(chǎn)生 panic。不過(guò)對(duì)于一個(gè)真實(shí)程序,需要合理地處理它:回到第九章復(fù)習(xí)正確處理錯(cuò)誤的策略。

在示例 16-8 中,我們?cè)谥骶€程中從信道的接收端獲取值。這類似于在河的下游撈起橡皮鴨或接收聊天信息:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

示例 16-8: 在主線程中接收并打印內(nèi)容 “hi”

信道的接收端有兩個(gè)有用的方法:recv 和 try_recv。這里,我們使用了 recv,它是 receive 的縮寫(xiě)。這個(gè)方法會(huì)阻塞主線程執(zhí)行直到從信道中接收一個(gè)值。一旦發(fā)送了一個(gè)值,recv 會(huì)在一個(gè) Result<T, E> 中返回它。當(dāng)信道發(fā)送端關(guān)閉,recv 會(huì)返回一個(gè)錯(cuò)誤表明不會(huì)再有新的值到來(lái)了。

try_recv 不會(huì)阻塞,相反它立刻返回一個(gè) Result<T, E>Ok 值包含可用的信息,而 Err 值代表此時(shí)沒(méi)有任何消息。如果線程在等待消息過(guò)程中還有其他工作時(shí)使用 try_recv 很有用:可以編寫(xiě)一個(gè)循環(huán)來(lái)頻繁調(diào)用 try_recv,在有可用消息時(shí)進(jìn)行處理,其余時(shí)候則處理一會(huì)其他工作直到再次檢查。

出于簡(jiǎn)單的考慮,這個(gè)例子使用了 recv;主線程中除了等待消息之外沒(méi)有任何其他工作,所以阻塞主線程是合適的。

如果運(yùn)行示例 16-8 中的代碼,我們將會(huì)看到主線程打印出這個(gè)值:

Got: hi

完美!

信道與所有權(quán)轉(zhuǎn)移

所有權(quán)規(guī)則在消息傳遞中扮演了重要角色,其有助于我們編寫(xiě)安全的并發(fā)代碼。防止并發(fā)編程中的錯(cuò)誤是在 Rust 程序中考慮所有權(quán)的一大優(yōu)勢(shì)。現(xiàn)在讓我們做一個(gè)試驗(yàn)來(lái)看看信道與所有權(quán)如何一同協(xié)作以避免產(chǎn)生問(wèn)題:我們將嘗試在新建線程中的信道中發(fā)送完 val 值 之后 再使用它。嘗試編譯示例 16-9 中的代碼并看看為何這是不允許的:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

示例 16-9: 在我們已經(jīng)發(fā)送到信道中后,嘗試使用 val 引用

這里嘗試在通過(guò) tx.send 發(fā)送 val 到信道中之后將其打印出來(lái)。允許這么做是一個(gè)壞主意:一旦將值發(fā)送到另一個(gè)線程后,那個(gè)線程可能會(huì)在我們?cè)俅问褂盟熬蛯⑵湫薷幕蛘邅G棄。其他線程對(duì)值可能的修改會(huì)由于不一致或不存在的數(shù)據(jù)而導(dǎo)致錯(cuò)誤或意外的結(jié)果。然而,嘗試編譯示例 16-9 的代碼時(shí),Rust 會(huì)給出一個(gè)錯(cuò)誤:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

我們的并發(fā)錯(cuò)誤會(huì)造成一個(gè)編譯時(shí)錯(cuò)誤。send 函數(shù)獲取其參數(shù)的所有權(quán)并移動(dòng)這個(gè)值歸接收者所有。這可以防止在發(fā)送后再次意外地使用這個(gè)值;所有權(quán)系統(tǒng)檢查一切是否合乎規(guī)則。

發(fā)送多個(gè)值并觀察接收者的等待

示例 16-8 中的代碼可以編譯和運(yùn)行,不過(guò)它并沒(méi)有明確的告訴我們兩個(gè)獨(dú)立的線程通過(guò)信道相互通訊。示例 16-10 則有一些改進(jìn)會(huì)證明示例 16-8 中的代碼是并發(fā)執(zhí)行的:新建線程現(xiàn)在會(huì)發(fā)送多個(gè)消息并在每個(gè)消息之間暫停一秒鐘。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

示例 16-10: 發(fā)送多個(gè)消息,并在每次發(fā)送后暫停一段時(shí)間

這一次,在新建線程中有一個(gè)字符串 vector 希望發(fā)送到主線程。我們遍歷他們,單獨(dú)的發(fā)送每一個(gè)字符串并通過(guò)一個(gè) Duration 值調(diào)用 thread::sleep 函數(shù)來(lái)暫停一秒。

在主線程中,不再顯式調(diào)用 recv 函數(shù):而是將 rx 當(dāng)作一個(gè)迭代器。對(duì)于每一個(gè)接收到的值,我們將其打印出來(lái)。當(dāng)信道被關(guān)閉時(shí),迭代器也將結(jié)束。

當(dāng)運(yùn)行示例 16-10 中的代碼時(shí),將看到如下輸出,每一行都會(huì)暫停一秒:

Got: hi
Got: from
Got: the
Got: thread

因?yàn)橹骶€程中的 for 循環(huán)里并沒(méi)有任何暫停或等待的代碼,所以可以說(shuō)主線程是在等待從新建線程中接收值。

通過(guò)克隆發(fā)送者來(lái)創(chuàng)建多個(gè)生產(chǎn)者

之前我們提到了mpsc是 multiple producer, single consumer 的縮寫(xiě)。可以運(yùn)用 mpsc 來(lái)擴(kuò)展示例 16-10 中的代碼來(lái)創(chuàng)建向同一接收者發(fā)送值的多個(gè)線程。這可以通過(guò)克隆信道的發(fā)送端來(lái)做到,如示例 16-11 所示:

文件名: src/main.rs

    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--

示例 16-11: 從多個(gè)生產(chǎn)者發(fā)送多個(gè)消息

這一次,在創(chuàng)建新線程之前,我們對(duì)信道的發(fā)送端調(diào)用了 clone 方法。這會(huì)給我們一個(gè)可以傳遞給第一個(gè)新建線程的發(fā)送端句柄。我們會(huì)將原始的信道發(fā)送端傳遞給第二個(gè)新建線程。這樣就會(huì)有兩個(gè)線程,每個(gè)線程將向信道的接收端發(fā)送不同的消息。

如果運(yùn)行這些代碼,你 可能 會(huì)看到這樣的輸出:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

雖然你可能會(huì)看到這些值以不同的順序出現(xiàn);這依賴于你的系統(tǒng)。這也就是并發(fā)既有趣又困難的原因。如果通過(guò) thread::sleep 做實(shí)驗(yàn),在不同的線程中提供不同的值,就會(huì)發(fā)現(xiàn)他們的運(yùn)行更加不確定,且每次都會(huì)產(chǎn)生不同的輸出。

現(xiàn)在我們見(jiàn)識(shí)過(guò)了信道如何工作,再看看另一種不同的并發(fā)方式吧。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)