Rust 將單線程 server 變?yōu)槎嗑€程 server

2023-03-22 15:16 更新
ch20-02-multithreaded.md
commit 95b5e7c86d33e98eec6f73b268d106621f3d24a1

目前 server 會(huì)依次處理每一個(gè)請(qǐng)求,意味著它在完成第一個(gè)連接的處理之前不會(huì)處理第二個(gè)連接。如果 server 正接收越來越多的請(qǐng)求,這類串行操作會(huì)使性能越來越差。如果一個(gè)請(qǐng)求花費(fèi)很長(zhǎng)時(shí)間來處理,隨后而來的請(qǐng)求則不得不等待這個(gè)長(zhǎng)請(qǐng)求結(jié)束,即便這些新請(qǐng)求可以很快就處理完。我們需要修復(fù)這種情況,不過首先讓我們實(shí)際嘗試一下這個(gè)問題。

在當(dāng)前 server 實(shí)現(xiàn)中模擬慢請(qǐng)求

讓我們看看一個(gè)慢請(qǐng)求如何影響當(dāng)前 server 實(shí)現(xiàn)中的其他請(qǐng)求。示例 20-10 通過模擬慢響應(yīng)實(shí)現(xiàn)了 /sleep 請(qǐng)求處理,它會(huì)使 server 在響應(yīng)之前休眠五秒。

文件名: src/main.rs

use std::thread;
use std::time::Duration;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    // --snip--
}

示例 20-10: 通過識(shí)別 /sleep 并休眠五秒來模擬慢請(qǐng)求

這段代碼有些凌亂,不過對(duì)于模擬的目的來說已經(jīng)足夠。這里創(chuàng)建了第二個(gè)請(qǐng)求 sleep,我們會(huì)識(shí)別其數(shù)據(jù)。在 if 塊之后增加了一個(gè) else if 來檢查 /sleep 請(qǐng)求,當(dāng)接收到這個(gè)請(qǐng)求時(shí),在渲染成功 HTML 頁面之前會(huì)先休眠五秒。

現(xiàn)在就可以真切的看出我們的 server 有多么的原始:真實(shí)的庫將會(huì)以更簡(jiǎn)潔的方式處理多請(qǐng)求識(shí)別問題!使用 cargo run 啟動(dòng) server,并接著打開兩個(gè)瀏覽器窗口:一個(gè)請(qǐng)求 http://127.0.0.1:7878/ 而另一個(gè)請(qǐng)求 http://127.0.0.1:7878/sleep 。如果像之前一樣多次請(qǐng)求 /,會(huì)發(fā)現(xiàn)響應(yīng)的比較快速。不過如果請(qǐng)求 /sleep 之后在請(qǐng)求 /,就會(huì)看到 / 會(huì)等待直到 sleep 休眠完五秒之后才出現(xiàn)。

這里有多種辦法來改變我們的 web server 使其避免所有請(qǐng)求都排在慢請(qǐng)求之后;我們將要實(shí)現(xiàn)的一個(gè)便是線程池。

使用線程池改善吞吐量

線程池thread pool)是一組預(yù)先分配的等待或準(zhǔn)備處理任務(wù)的線程。當(dāng)程序收到一個(gè)新任務(wù),線程池中的一個(gè)線程會(huì)被分配任務(wù),這個(gè)線程會(huì)離開并處理任務(wù)。其余的線程則可用于處理在第一個(gè)線程處理任務(wù)的同時(shí)處理其他接收到的任務(wù)。當(dāng)?shù)谝粋€(gè)線程處理完任務(wù)時(shí),它會(huì)返回空閑線程池中等待處理新任務(wù)。線程池允許我們并發(fā)處理連接,增加 server 的吞吐量。

我們會(huì)將池中線程限制為較少的數(shù)量,以防拒絕服務(wù)(Denial of Service, DoS)攻擊;如果程序?yàn)槊恳粋€(gè)接收的請(qǐng)求都新建一個(gè)線程,某人向 server 發(fā)起千萬級(jí)的請(qǐng)求時(shí)會(huì)耗盡服務(wù)器的資源并導(dǎo)致所有請(qǐng)求的處理都被終止。

不同于分配無限的線程,線程池中將有固定數(shù)量的等待線程。當(dāng)新進(jìn)請(qǐng)求時(shí),將請(qǐng)求發(fā)送到線程池中做處理。線程池會(huì)維護(hù)一個(gè)接收請(qǐng)求的隊(duì)列。每一個(gè)線程會(huì)從隊(duì)列中取出一個(gè)請(qǐng)求,處理請(qǐng)求,接著向?qū)﹃?duì)列索取另一個(gè)請(qǐng)求。通過這種設(shè)計(jì),則可以并發(fā)處理 N 個(gè)請(qǐng)求,其中 N 為線程數(shù)。如果每一個(gè)線程都在響應(yīng)慢請(qǐng)求,之后的請(qǐng)求仍然會(huì)阻塞隊(duì)列,不過相比之前增加了能處理的慢請(qǐng)求的數(shù)量。

這個(gè)設(shè)計(jì)僅僅是多種改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和單線程異步 I/O 模型。如果你對(duì)這個(gè)主題感興趣,則可以閱讀更多關(guān)于其他解決方案的內(nèi)容并嘗試用 Rust 實(shí)現(xiàn)他們;對(duì)于一個(gè)像 Rust 這樣的底層語言,所有這些方法都是可能的。

在開始之前,讓我們討論一下線程池應(yīng)用看起來怎樣。當(dāng)嘗試設(shè)計(jì)代碼時(shí),首先編寫客戶端接口確實(shí)有助于指導(dǎo)代碼設(shè)計(jì)。以期望的調(diào)用方式來構(gòu)建 API 代碼的結(jié)構(gòu),接著在這個(gè)結(jié)構(gòu)之內(nèi)實(shí)現(xiàn)功能,而不是先實(shí)現(xiàn)功能再設(shè)計(jì)公有 API。

類似于第十二章項(xiàng)目中使用的測(cè)試驅(qū)動(dòng)開發(fā)。這里將要使用編譯器驅(qū)動(dòng)開發(fā)(compiler-driven development)。我們將編寫調(diào)用所期望的函數(shù)的代碼,接著觀察編譯器錯(cuò)誤告訴我們接下來需要修改什么使得代碼可以工作。

為每一個(gè)請(qǐng)求分配線程的代碼結(jié)構(gòu)

首先,讓我們探索一下為每一個(gè)連接都創(chuàng)建一個(gè)線程的代碼看起來如何。這并不是最終方案,因?yàn)檎缰爸v到的它會(huì)潛在的分配無限的線程,不過這是一個(gè)開始。示例 20-11 展示了 main 的改變,它在 for 循環(huán)中為每一個(gè)流分配了一個(gè)新線程進(jìn)行處理:

文件名: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

示例 20-11: 為每一個(gè)流新建一個(gè)線程

正如第十六章講到的,thread::spawn 會(huì)創(chuàng)建一個(gè)新線程并在其中運(yùn)行閉包中的代碼。如果運(yùn)行這段代碼并在在瀏覽器中加載 /sleep,接著在另兩個(gè)瀏覽器標(biāo)簽頁中加載 /,確實(shí)會(huì)發(fā)現(xiàn) / 請(qǐng)求不必等待 /sleep 結(jié)束。不過正如之前提到的,這最終會(huì)使系統(tǒng)崩潰因?yàn)槲覀儫o限制的創(chuàng)建新線程。

為有限數(shù)量的線程創(chuàng)建一個(gè)類似的接口

我們期望線程池以類似且熟悉的方式工作,以便從線程切換到線程池并不會(huì)對(duì)使用該 API 的代碼做出較大的修改。示例 20-12 展示我們希望用來替換 thread::spawn 的 ThreadPool 結(jié)構(gòu)體的假想接口:

文件名: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

示例 20-12: 假想的 ThreadPool 接口

這里使用 ThreadPool::new 來創(chuàng)建一個(gè)新的線程池,它有一個(gè)可配置的線程數(shù)的參數(shù),在這里是四。這樣在 for 循環(huán)中,pool.execute 有著類似 thread::spawn 的接口,它獲取一個(gè)線程池運(yùn)行于每一個(gè)流的閉包。pool.execute 需要實(shí)現(xiàn)為獲取閉包并傳遞給池中的線程運(yùn)行。這段代碼還不能編譯,不過通過嘗試編譯器會(huì)指導(dǎo)我們?nèi)绾涡迯?fù)它。

采用編譯器驅(qū)動(dòng)構(gòu)建 ThreadPool 結(jié)構(gòu)體

繼續(xù)并對(duì)示例 20-12 中的 src/main.rs 做出修改,并利用來自 cargo check 的編譯器錯(cuò)誤來驅(qū)動(dòng)開發(fā)。下面是我們得到的第一個(gè)錯(cuò)誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:10:16
   |
10 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error

好的,這告訴我們需要一個(gè) ThreadPool 類型或模塊,所以我們將構(gòu)建一個(gè)。ThreadPool 的實(shí)現(xiàn)會(huì)與 web server 的特定工作相獨(dú)立,所以讓我們從 hello crate 切換到存放 ThreadPool 實(shí)現(xiàn)的新庫 crate。這也意味著可以在任何工作中使用這個(gè)單獨(dú)的線程池庫,而不僅僅是處理網(wǎng)絡(luò)請(qǐng)求。

創(chuàng)建 src/lib.rs 文件,它包含了目前可用的最簡(jiǎn)單的 ThreadPool 定義:

文件名: src/lib.rs

pub struct ThreadPool;

接著創(chuàng)建一個(gè)新目錄,src/bin,并將二進(jìn)制 crate 根文件從 src/main.rs 移動(dòng)到 src/bin/main.rs。這使得庫 crate 成為 hello 目錄的主要 crate;不過仍然可以使用 cargo run 運(yùn)行 src/bin/main.rs 二進(jìn)制文件。移動(dòng)了 main.rs 文件之后,修改 src/bin/main.rs 文件開頭加入如下代碼來引入庫 crate 并將 ThreadPool 引入作用域:

文件名: src/bin/main.rs

use hello::ThreadPool;

這仍然不能工作,再次嘗試運(yùn)行來得到下一個(gè)需要解決的錯(cuò)誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/bin/main.rs:11:28
   |
11 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

這告訴我們下一步是為 ThreadPool 創(chuàng)建一個(gè)叫做 new 的關(guān)聯(lián)函數(shù)。我們還知道 new 需要有一個(gè)參數(shù)可以接受 4,而且 new 應(yīng)該返回 ThreadPool 實(shí)例。讓我們實(shí)現(xiàn)擁有此特征的最小化 new 函數(shù):

文件夾: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

這里選擇 usize 作為 size 參數(shù)的類型,因?yàn)槲覀冎罏樨?fù)的線程數(shù)沒有意義。我們還知道將使用 4 作為線程集合的元素?cái)?shù)量,這也就是使用 usize 類型的原因,如第三章 “整型” 部分所講。

再次編譯檢查這段代碼:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/bin/main.rs:16:14
   |
16 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

現(xiàn)在有了一個(gè)警告和一個(gè)錯(cuò)誤。暫時(shí)先忽略警告,發(fā)生錯(cuò)誤是因?yàn)椴]有 ThreadPool 上的 execute 方法?;貞?nbsp;“為有限數(shù)量的線程創(chuàng)建一個(gè)類似的接口” 部分我們決定線程池應(yīng)該有與 thread::spawn 類似的接口,同時(shí)我們將實(shí)現(xiàn) execute 函數(shù)來獲取傳遞的閉包并將其傳遞給池中的空閑線程執(zhí)行。

我們會(huì)在 ThreadPool 上定義 execute 函數(shù)來獲取一個(gè)閉包參數(shù)。回憶第十三章的 “使用帶有泛型和 Fn trait 的閉包” 部分,閉包作為參數(shù)時(shí)可以使用三個(gè)不同的 trait:FnFnMut 和 FnOnce。我們需要決定這里應(yīng)該使用哪種閉包。最終需要實(shí)現(xiàn)的類似于標(biāo)準(zhǔn)庫的 thread::spawn,所以我們可以觀察 thread::spawn 的簽名在其參數(shù)中使用了何種 bound。查看文檔會(huì)發(fā)現(xiàn):

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

F 是這里我們關(guān)心的參數(shù);T 與返回值有關(guān)所以我們并不關(guān)心??紤]到 spawn 使用 FnOnce 作為 F 的 trait bound,這可能也是我們需要的,因?yàn)樽罱K會(huì)將傳遞給 execute 的參數(shù)傳給 spawn。因?yàn)樘幚碚?qǐng)求的線程只會(huì)執(zhí)行閉包一次,這也進(jìn)一步確認(rèn)了 FnOnce 是我們需要的 trait,這里符合 FnOnce 中 Once 的意思。

F 還有 trait bound Send 和生命周期綁定 'static,這對(duì)我們的情況也是有意義的:需要 Send 來將閉包從一個(gè)線程轉(zhuǎn)移到另一個(gè)線程,而 'static 是因?yàn)椴⒉恢谰€程會(huì)執(zhí)行多久。讓我們編寫一個(gè)使用帶有這些 bound 的泛型參數(shù) F 的 ThreadPool 的 execute 方法:

文件名: src/lib.rs

impl ThreadPool {
    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

FnOnce trait 仍然需要之后的 (),因?yàn)檫@里的 FnOnce 代表一個(gè)沒有參數(shù)也沒有返回值的閉包。正如函數(shù)的定義,返回值類型可以從簽名中省略,不過即便沒有參數(shù)也需要括號(hào)。

這里再一次增加了 execute 方法的最小化實(shí)現(xiàn):它沒有做任何工作,只是嘗試讓代碼能夠編譯。再次進(jìn)行檢查:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

現(xiàn)在就只有警告了!這意味著能夠編譯了!注意如果嘗試 cargo run 運(yùn)行程序并在瀏覽器中發(fā)起請(qǐng)求,仍會(huì)在瀏覽器中出現(xiàn)在本章開始時(shí)那樣的錯(cuò)誤。這個(gè)庫實(shí)際上還沒有調(diào)用傳遞給 execute 的閉包!

一個(gè)你可能聽說過的關(guān)于像 Haskell 和 Rust 這樣有嚴(yán)格編譯器的語言的說法是 “如果代碼能夠編譯,它就能工作”。這是一個(gè)提醒大家的好時(shí)機(jī),實(shí)際上這并不是普適的。我們的項(xiàng)目可以編譯,不過它完全沒有做任何工作!如果構(gòu)建一個(gè)真實(shí)且功能完整的項(xiàng)目,則需花費(fèi)大量的時(shí)間來開始編寫單元測(cè)試來檢查代碼能否編譯 并且 擁有期望的行為。

在 new 中驗(yàn)證池中線程數(shù)量

這里仍然存在警告是因?yàn)槠洳]有對(duì) new 和 execute 的參數(shù)做任何操作。讓我們用期望的行為來實(shí)現(xiàn)這些函數(shù)。以考慮 new 作為開始。之前選擇使用無符號(hào)類型作為 size 參數(shù)的類型,因?yàn)榫€程數(shù)為負(fù)的線程池沒有意義。然而,線程數(shù)為零的線程池同樣沒有意義,不過零是一個(gè)完全有效的 u32 值。讓我們?cè)黾釉诜祷?nbsp;ThreadPool 實(shí)例之前檢查 size 是否大于零的代碼,并使用 assert! 宏在得到零時(shí) panic,如示例 20-13 所示:

文件名: src/lib.rs

impl ThreadPool {
    /// 創(chuàng)建線程池。
    ///
    /// 線程池中線程的數(shù)量。
    ///
    /// # Panics
    ///
    /// `new` 函數(shù)在 size 為 0 時(shí)會(huì) panic。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}

示例 20-13: 實(shí)現(xiàn) ThreadPool::new 在 size 為零時(shí) panic

這里用文檔注釋為 ThreadPool 增加了一些文檔。注意這里遵循了良好的文檔實(shí)踐并增加了一個(gè)部分來提示函數(shù)會(huì) panic 的情況,正如第十四章所討論的。嘗試運(yùn)行 cargo doc --open 并點(diǎn)擊 ThreadPool 結(jié)構(gòu)體來查看生成的 new 的文檔看起來如何!

相比像這里使用 assert! 宏,也可以讓 new 像之前 I/O 項(xiàng)目中示例 12-9 中 Config::new 那樣返回一個(gè) Result,不過在這里我們選擇創(chuàng)建一個(gè)沒有任何線程的線程池應(yīng)該是不可恢復(fù)的錯(cuò)誤。如果你想做的更好,嘗試編寫一個(gè)采用如下簽名的 new 版本來感受一下兩者的區(qū)別:

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

分配空間以儲(chǔ)存線程

現(xiàn)在有了一個(gè)有效的線程池線程數(shù),就可以實(shí)際創(chuàng)建這些線程并在返回之前將他們儲(chǔ)存在 ThreadPool 結(jié)構(gòu)體中。不過如何 “儲(chǔ)存” 一個(gè)線程?讓我們?cè)倏纯?nbsp;thread::spawn 的簽名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn 返回 JoinHandle<T>,其中 T 是閉包返回的類型。嘗試使用 JoinHandle 來看看會(huì)發(fā)生什么。在我們的情況中,傳遞給線程池的閉包會(huì)處理連接并不返回任何值,所以 T 將會(huì)是單元類型 ()。

示例 20-14 中的代碼可以編譯,不過實(shí)際上還并沒有創(chuàng)建任何線程。我們改變了 ThreadPool 的定義來存放一個(gè) thread::JoinHandle<()> 的 vector 實(shí)例,使用 size 容量來初始化,并設(shè)置一個(gè) for 循環(huán)了來運(yùn)行創(chuàng)建線程的代碼,并返回包含這些線程的 ThreadPool 實(shí)例:

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }

    // --snip--
}

示例 20-14: 為 ThreadPool 創(chuàng)建一個(gè) vector 來存放線程

這里將 std::thread 引入庫 crate 的作用域,因?yàn)槭褂昧?nbsp;thread::JoinHandle 作為 ThreadPool 中 vector 元素的類型。

在得到了有效的數(shù)量之后,ThreadPool 新建一個(gè)存放 size 個(gè)元素的 vector。本書還未使用過 with_capacity,它與 Vec::new 做了同樣的工作,不過有一個(gè)重要的區(qū)別:它為 vector 預(yù)先分配空間。因?yàn)橐呀?jīng)知道了 vector 中需要 size 個(gè)元素,預(yù)先進(jìn)行分配比僅僅 Vec::new 要稍微有效率一些,因?yàn)?nbsp;Vec::new 隨著插入元素而重新改變大小。

如果再次運(yùn)行 cargo check,會(huì)看到一些警告,不過應(yīng)該可以編譯成功。

Worker 結(jié)構(gòu)體負(fù)責(zé)從 ThreadPool 中將代碼傳遞給線程

示例 20-14 的 for 循環(huán)中留下了一個(gè)關(guān)于創(chuàng)建線程的注釋。如何實(shí)際創(chuàng)建線程呢?這是一個(gè)難題。標(biāo)準(zhǔn)庫提供的創(chuàng)建線程的方法,thread::spawn,它期望獲取一些一旦創(chuàng)建線程就應(yīng)該執(zhí)行的代碼。然而,我們希望開始線程并使其等待稍后傳遞的代碼。標(biāo)準(zhǔn)庫的線程實(shí)現(xiàn)并沒有包含這么做的方法;我們必須自己實(shí)現(xiàn)。

我們將要實(shí)現(xiàn)的行為是創(chuàng)建線程并稍后發(fā)送代碼,這會(huì)在 ThreadPool 和線程間引入一個(gè)新數(shù)據(jù)類型來管理這種新行為。這個(gè)數(shù)據(jù)結(jié)構(gòu)稱為 Worker:這是一個(gè)池實(shí)現(xiàn)中的常見概念。想象一下在餐館廚房工作的員工:?jiǎn)T工等待來自客戶的訂單,他們負(fù)責(zé)接受這些訂單并完成它們。

不同于在線程池中儲(chǔ)存一個(gè) JoinHandle<()> 實(shí)例的 vector,我們會(huì)儲(chǔ)存 Worker 結(jié)構(gòu)體的實(shí)例。每一個(gè) Worker 會(huì)儲(chǔ)存一個(gè)單獨(dú)的 JoinHandle<()> 實(shí)例。接著會(huì)在 Worker 上實(shí)現(xiàn)一個(gè)方法,它會(huì)獲取需要允許代碼的閉包并將其發(fā)送給已經(jīng)運(yùn)行的線程執(zhí)行。我們還會(huì)賦予每一個(gè) worker id,這樣就可以在日志和調(diào)試中區(qū)別線程池中的不同 worker。

首先,讓我們做出如此創(chuàng)建 ThreadPool 時(shí)所需的修改。在通過如下方式設(shè)置完 Worker 之后,我們會(huì)實(shí)現(xiàn)向線程發(fā)送閉包的代碼:

  1. 定義 ?Worker? 結(jié)構(gòu)體存放 ?id? 和 ?JoinHandle<()>?
  2. 修改 ?ThreadPool? 存放一個(gè) ?Worker? 實(shí)例的 vector
  3. 定義 ?Worker::new? 函數(shù),它獲取一個(gè) ?id? 數(shù)字并返回一個(gè)帶有 ?id? 和用空閉包分配的線程的 ?Worker? 實(shí)例
  4. 在 ?ThreadPool::new? 中,使用 ?for? 循環(huán)計(jì)數(shù)生成 ?id?,使用這個(gè) ?id? 新建 ?Worker?,并儲(chǔ)存進(jìn) vector 中

如果你渴望挑戰(zhàn),在查示例 20-15 中的代碼之前嘗試自己實(shí)現(xiàn)這些修改。

準(zhǔn)備好了嗎?示例 20-15 就是一個(gè)做出了這些修改的例子:

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

示例 20-15: 修改 ThreadPool 存放 Worker 實(shí)例而不是直接存放線程

這里將 ThreadPool 中字段名從 threads 改為 workers,因?yàn)樗F(xiàn)在儲(chǔ)存 Worker 而不是 JoinHandle<()>。使用 for 循環(huán)中的計(jì)數(shù)作為 Worker::new 的參數(shù),并將每一個(gè)新建的 Worker 儲(chǔ)存在叫做 workers 的 vector 中。

Worker 結(jié)構(gòu)體和其 new 函數(shù)是私有的,因?yàn)橥獠看a(比如 src/bin/main.rs 中的 server)并不需要知道關(guān)于 ThreadPool 中使用 Worker 結(jié)構(gòu)體的實(shí)現(xiàn)細(xì)節(jié)。Worker::new 函數(shù)使用 id 參數(shù)并儲(chǔ)存了使用一個(gè)空閉包創(chuàng)建的 JoinHandle<()>

這段代碼能夠編譯并用指定給 ThreadPool::new 的參數(shù)創(chuàng)建儲(chǔ)存了一系列的 Worker 實(shí)例,不過 仍然 沒有處理 execute 中得到的閉包。讓我們聊聊接下來怎么做。

使用信道向線程發(fā)送請(qǐng)求

下一個(gè)需要解決的問題是傳遞給 thread::spawn 的閉包完全沒有做任何工作。目前,我們?cè)?nbsp;execute 方法中獲得期望執(zhí)行的閉包,不過在創(chuàng)建 ThreadPool 的過程中創(chuàng)建每一個(gè) Worker 時(shí)需要向 thread::spawn 傳遞一個(gè)閉包。

我們希望剛創(chuàng)建的 Worker 結(jié)構(gòu)體能夠從 ThreadPool 的隊(duì)列中獲取需要執(zhí)行的代碼,并發(fā)送到線程中執(zhí)行他們。

在第十六章,我們學(xué)習(xí)了 信道 —— 一個(gè)溝通兩個(gè)線程的簡(jiǎn)單手段 —— 對(duì)于這個(gè)例子來說則是絕佳的。這里信道將充當(dāng)任務(wù)隊(duì)列的作用,execute 將通過 ThreadPool 向其中線程正在尋找工作的 Worker 實(shí)例發(fā)送任務(wù)。如下是這個(gè)計(jì)劃:

  1. ?ThreadPool? 會(huì)創(chuàng)建一個(gè)信道并充當(dāng)發(fā)送端。
  2. 每個(gè) ?Worker? 將會(huì)充當(dāng)信道的接收端。
  3. 新建一個(gè) ?Job? 結(jié)構(gòu)體來存放用于向信道中發(fā)送的閉包。
  4. ?execute? 方法會(huì)在信道發(fā)送端發(fā)出期望執(zhí)行的任務(wù)。
  5. 在線程中,?Worker? 會(huì)遍歷信道的接收端并執(zhí)行任何接收到的任務(wù)。

讓我們以在 ThreadPool::new 中創(chuàng)建信道并讓 ThreadPool 實(shí)例充當(dāng)發(fā)送端開始,如示例 20-16 所示。Job 是將在信道中發(fā)出的類型,目前它是一個(gè)沒有任何內(nèi)容的結(jié)構(gòu)體:

文件名: src/lib.rs

// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

示例 20-16: 修改 ThreadPool 來儲(chǔ)存一個(gè)發(fā)送 Job 實(shí)例的信道發(fā)送端

在 ThreadPool::new 中,新建了一個(gè)信道,并接著讓線程池在接收端等待。這段代碼能夠編譯,不過仍有警告。

讓我們嘗試在線程池創(chuàng)建每個(gè) worker 時(shí)將信道的接收端傳遞給他們。須知我們希望在 worker 所分配的線程中使用信道的接收端,所以將在閉包中引用 receiver 參數(shù)。示例 20-17 中展示的代碼還不能編譯:

文件名: src/lib.rs

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-17: 將信道的接收端傳遞給 worker

這是一些小而直觀的修改:將信道的接收端傳遞進(jìn)了 Worker::new,并接著在閉包中使用它。

如果嘗試 check 代碼,會(huì)得到這個(gè)錯(cuò)誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:27:42
   |
22 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
27 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error

這段代碼嘗試將 receiver 傳遞給多個(gè) Worker 實(shí)例。這是不行的,回憶第十六章:Rust 所提供的信道實(shí)現(xiàn)是多 生產(chǎn)者,單 消費(fèi)者 的。這意味著不能簡(jiǎn)單的克隆信道的消費(fèi)端來解決問題。即便可以,那也不是我們希望使用的技術(shù);我們希望通過在所有的 worker 中共享單一 receiver,在線程間分發(fā)任務(wù)。

另外,從信道隊(duì)列中取出任務(wù)涉及到修改 receiver,所以這些線程需要一個(gè)能安全的共享和修改 receiver 的方式,否則可能導(dǎo)致競(jìng)爭(zhēng)狀態(tài)(參考第十六章)。

回憶一下第十六章討論的線程安全智能指針,為了在多個(gè)線程間共享所有權(quán)并允許線程修改其值,需要使用 Arc<Mutex<T>>。Arc 使得多個(gè) worker 擁有接收端,而 Mutex 則確保一次只有一個(gè) worker 能從接收端得到任務(wù)。示例 20-18 展示了所需的修改:

文件名: src/lib.rs

use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}

示例 20-18: 使用 Arc 和 Mutex 在 worker 間共享信道的接收端

在 ThreadPool::new 中,將信道的接收端放入一個(gè) Arc 和一個(gè) Mutex 中。對(duì)于每一個(gè)新 worker,克隆 Arc 來增加引用計(jì)數(shù),如此這些 worker 就可以共享接收端的所有權(quán)了。

通過這些修改,代碼可以編譯了!我們做到了!

實(shí)現(xiàn) execute 方法

最后讓我們實(shí)現(xiàn) ThreadPool 上的 execute 方法。同時(shí)也要修改 Job 結(jié)構(gòu)體:它將不再是結(jié)構(gòu)體,Job 將是一個(gè)有著 execute 接收到的閉包類型的 trait 對(duì)象的類型別名。第十九章 “類型別名用來創(chuàng)建類型同義詞” 部分提到過,類型別名允許將長(zhǎng)的類型變短。觀察示例 20-19:

文件名: src/lib.rs

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

示例 20-19: 為存放每一個(gè)閉包的 Box 創(chuàng)建一個(gè) Job 類型別名,接著在信道中發(fā)出任務(wù)

在使用 execute 得到的閉包新建 Job 實(shí)例之后,將這些任務(wù)從信道的發(fā)送端發(fā)出。這里調(diào)用 send 上的 unwrap,因?yàn)榘l(fā)送可能會(huì)失敗,這可能發(fā)生于例如停止了所有線程執(zhí)行的情況,這意味著接收端停止接收新消息了。不過目前我們無法停止線程執(zhí)行;只要線程池存在他們就會(huì)一直執(zhí)行。使用 unwrap 是因?yàn)槲覀冎朗〔豢赡馨l(fā)生,即便編譯器不這么認(rèn)為。

不過到此事情還沒有結(jié)束!在 worker 中,傳遞給 thread::spawn 的閉包仍然還只是 引用 了信道的接收端。相反我們需要閉包一直循環(huán),向信道的接收端請(qǐng)求任務(wù),并在得到任務(wù)時(shí)執(zhí)行他們。如示例 20-20 對(duì) Worker::new 做出修改:

文件名: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {} got a job; executing.", id);

            job();
        });

        Worker { id, thread }
    }
}

示例 20-20: 在 worker 線程中接收并執(zhí)行任務(wù)

這里,首先在 receiver 上調(diào)用了 lock 來獲取互斥器,接著 unwrap 在出現(xiàn)任何錯(cuò)誤時(shí) panic。如果互斥器處于一種叫做 被污染poisoned)的狀態(tài)時(shí)獲取鎖可能會(huì)失敗,這可能發(fā)生于其他線程在持有鎖時(shí) panic 了且沒有釋放鎖。在這種情況下,調(diào)用 unwrap 使其 panic 是正確的行為。請(qǐng)隨意將 unwrap 改為包含有意義錯(cuò)誤信息的 expect

如果鎖定了互斥器,接著調(diào)用 recv 從信道中接收 Job。最后的 unwrap 也繞過了一些錯(cuò)誤,這可能發(fā)生于持有信道發(fā)送端的線程停止的情況,類似于如果接收端關(guān)閉時(shí) send 方法如何返回 Err 一樣。

調(diào)用 recv 會(huì)阻塞當(dāng)前線程,所以如果還沒有任務(wù),其會(huì)等待直到有可用的任務(wù)。Mutex<T> 確保一次只有一個(gè) Worker 線程嘗試請(qǐng)求任務(wù)。

現(xiàn)在線程池處于可以運(yùn)行的狀態(tài)了!執(zhí)行 cargo run 并發(fā)起一些請(qǐng)求:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: 3 warnings emitted

    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/main`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功了!現(xiàn)在我們有了一個(gè)可以異步執(zhí)行連接的線程池!它絕不會(huì)創(chuàng)建超過四個(gè)線程,所以當(dāng) server 收到大量請(qǐng)求時(shí)系統(tǒng)也不會(huì)負(fù)擔(dān)過重。如果請(qǐng)求 /sleep,server 也能夠通過另外一個(gè)線程處理其他請(qǐng)求。

注意如果同時(shí)在多個(gè)瀏覽器窗口打開 /sleep,它們可能會(huì)彼此間隔地加載 5 秒,因?yàn)橐恍g覽器處于緩存的原因會(huì)順序執(zhí)行相同請(qǐng)求的多個(gè)實(shí)例。這些限制并不是由于我們的 web server 造成的。

在學(xué)習(xí)了第十八章的 while let 循環(huán)之后,你可能會(huì)好奇為何不能如此編寫 worker 線程,如示例 20-21 所示:

文件名: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {} got a job; executing.", id);

                job();
            }
        });

        Worker { id, thread }
    }
}

示例 20-21: 一個(gè)使用 while let 的 Worker::new 替代實(shí)現(xiàn)

這段代碼可以編譯和運(yùn)行,但是并不會(huì)產(chǎn)生所期望的線程行為:一個(gè)慢請(qǐng)求仍然會(huì)導(dǎo)致其他請(qǐng)求等待執(zhí)行。其原因有些微妙:Mutex 結(jié)構(gòu)體沒有公有 unlock 方法,因?yàn)殒i的所有權(quán)依賴 lock 方法返回的 LockResult<MutexGuard<T>> 中 MutexGuard<T> 的生命周期。這允許借用檢查器在編譯時(shí)確保絕不會(huì)在沒有持有鎖的情況下訪問由 Mutex 守護(hù)的資源,不過如果沒有認(rèn)真的思考 MutexGuard<T> 的生命周期的話,也可能會(huì)導(dǎo)致比預(yù)期更久的持有鎖。

示例 20-20 中的代碼使用的 let job = receiver.lock().unwrap().recv().unwrap(); 之所以可以工作是因?yàn)閷?duì)于 let 來說,當(dāng) let 語句結(jié)束時(shí)任何表達(dá)式中等號(hào)右側(cè)使用的臨時(shí)值都會(huì)立即被丟棄。然而 while letif let 和 match)直到相關(guān)的代碼塊結(jié)束都不會(huì)丟棄臨時(shí)值。在示例 20-21 中,job() 調(diào)用期間鎖一直持續(xù),這也意味著其他的 worker 無法接受任務(wù)。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)