ch20-02-multithreaded.md
commit 95b5e7c86d33e98eec6f73b268d106621f3d24a1
目前 server 會(huì)依次處理每一個(gè)請(qǐng)求,意味著它在完成第一個(gè)連接的處理之前不會(huì)處理第二個(gè)連接。如果 server 正接收越來越多的請(qǐng)求,這類串行操作會(huì)使性能越來越差。如果一個(gè)請(qǐng)求花費(fèi)很長(zhǎng)時(shí)間來處理,隨后而來的請(qǐng)求則不得不等待這個(gè)長(zhǎng)請(qǐng)求結(jié)束,即便這些新請(qǐng)求可以很快就處理完。我們需要修復(fù)這種情況,不過首先讓我們實(shí)際嘗試一下這個(gè)問題。
讓我們看看一個(gè)慢請(qǐng)求如何影響當(dāng)前 server 實(shí)現(xiàn)中的其他請(qǐng)求。示例 20-10 通過模擬慢響應(yīng)實(shí)現(xiàn)了 /sleep 請(qǐng)求處理,它會(huì)使 server 在響應(yīng)之前休眠五秒。
文件名: src/main.rs
use std::thread;
use std::time::Duration;
// --snip--
fn handle_connection(mut stream: TcpStream) {
// --snip--
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
// --snip--
}
示例 20-10: 通過識(shí)別 /sleep 并休眠五秒來模擬慢請(qǐng)求
這段代碼有些凌亂,不過對(duì)于模擬的目的來說已經(jīng)足夠。這里創(chuàng)建了第二個(gè)請(qǐng)求 sleep
,我們會(huì)識(shí)別其數(shù)據(jù)。在 if
塊之后增加了一個(gè) else if
來檢查 /sleep 請(qǐng)求,當(dāng)接收到這個(gè)請(qǐng)求時(shí),在渲染成功 HTML 頁面之前會(huì)先休眠五秒。
現(xiàn)在就可以真切的看出我們的 server 有多么的原始:真實(shí)的庫將會(huì)以更簡(jiǎn)潔的方式處理多請(qǐng)求識(shí)別問題!使用 cargo run
啟動(dòng) server,并接著打開兩個(gè)瀏覽器窗口:一個(gè)請(qǐng)求 http://127.0.0.1:7878/ 而另一個(gè)請(qǐng)求 http://127.0.0.1:7878/sleep 。如果像之前一樣多次請(qǐng)求 /,會(huì)發(fā)現(xiàn)響應(yīng)的比較快速。不過如果請(qǐng)求 /sleep 之后在請(qǐng)求 /,就會(huì)看到 / 會(huì)等待直到 sleep
休眠完五秒之后才出現(xiàn)。
這里有多種辦法來改變我們的 web server 使其避免所有請(qǐng)求都排在慢請(qǐng)求之后;我們將要實(shí)現(xiàn)的一個(gè)便是線程池。
線程池(thread pool)是一組預(yù)先分配的等待或準(zhǔn)備處理任務(wù)的線程。當(dāng)程序收到一個(gè)新任務(wù),線程池中的一個(gè)線程會(huì)被分配任務(wù),這個(gè)線程會(huì)離開并處理任務(wù)。其余的線程則可用于處理在第一個(gè)線程處理任務(wù)的同時(shí)處理其他接收到的任務(wù)。當(dāng)?shù)谝粋€(gè)線程處理完任務(wù)時(shí),它會(huì)返回空閑線程池中等待處理新任務(wù)。線程池允許我們并發(fā)處理連接,增加 server 的吞吐量。
我們會(huì)將池中線程限制為較少的數(shù)量,以防拒絕服務(wù)(Denial of Service, DoS)攻擊;如果程序?yàn)槊恳粋€(gè)接收的請(qǐng)求都新建一個(gè)線程,某人向 server 發(fā)起千萬級(jí)的請(qǐng)求時(shí)會(huì)耗盡服務(wù)器的資源并導(dǎo)致所有請(qǐng)求的處理都被終止。
不同于分配無限的線程,線程池中將有固定數(shù)量的等待線程。當(dāng)新進(jìn)請(qǐng)求時(shí),將請(qǐng)求發(fā)送到線程池中做處理。線程池會(huì)維護(hù)一個(gè)接收請(qǐng)求的隊(duì)列。每一個(gè)線程會(huì)從隊(duì)列中取出一個(gè)請(qǐng)求,處理請(qǐng)求,接著向?qū)﹃?duì)列索取另一個(gè)請(qǐng)求。通過這種設(shè)計(jì),則可以并發(fā)處理 N
個(gè)請(qǐng)求,其中 N
為線程數(shù)。如果每一個(gè)線程都在響應(yīng)慢請(qǐng)求,之后的請(qǐng)求仍然會(huì)阻塞隊(duì)列,不過相比之前增加了能處理的慢請(qǐng)求的數(shù)量。
這個(gè)設(shè)計(jì)僅僅是多種改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和單線程異步 I/O 模型。如果你對(duì)這個(gè)主題感興趣,則可以閱讀更多關(guān)于其他解決方案的內(nèi)容并嘗試用 Rust 實(shí)現(xiàn)他們;對(duì)于一個(gè)像 Rust 這樣的底層語言,所有這些方法都是可能的。
在開始之前,讓我們討論一下線程池應(yīng)用看起來怎樣。當(dāng)嘗試設(shè)計(jì)代碼時(shí),首先編寫客戶端接口確實(shí)有助于指導(dǎo)代碼設(shè)計(jì)。以期望的調(diào)用方式來構(gòu)建 API 代碼的結(jié)構(gòu),接著在這個(gè)結(jié)構(gòu)之內(nèi)實(shí)現(xiàn)功能,而不是先實(shí)現(xiàn)功能再設(shè)計(jì)公有 API。
類似于第十二章項(xiàng)目中使用的測(cè)試驅(qū)動(dòng)開發(fā)。這里將要使用編譯器驅(qū)動(dòng)開發(fā)(compiler-driven development)。我們將編寫調(diào)用所期望的函數(shù)的代碼,接著觀察編譯器錯(cuò)誤告訴我們接下來需要修改什么使得代碼可以工作。
首先,讓我們探索一下為每一個(gè)連接都創(chuàng)建一個(gè)線程的代碼看起來如何。這并不是最終方案,因?yàn)檎缰爸v到的它會(huì)潛在的分配無限的線程,不過這是一個(gè)開始。示例 20-11 展示了 main
的改變,它在 for
循環(huán)中為每一個(gè)流分配了一個(gè)新線程進(jìn)行處理:
文件名: src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
示例 20-11: 為每一個(gè)流新建一個(gè)線程
正如第十六章講到的,thread::spawn
會(huì)創(chuàng)建一個(gè)新線程并在其中運(yùn)行閉包中的代碼。如果運(yùn)行這段代碼并在在瀏覽器中加載 /sleep,接著在另兩個(gè)瀏覽器標(biāo)簽頁中加載 /,確實(shí)會(huì)發(fā)現(xiàn) / 請(qǐng)求不必等待 /sleep 結(jié)束。不過正如之前提到的,這最終會(huì)使系統(tǒng)崩潰因?yàn)槲覀儫o限制的創(chuàng)建新線程。
我們期望線程池以類似且熟悉的方式工作,以便從線程切換到線程池并不會(huì)對(duì)使用該 API 的代碼做出較大的修改。示例 20-12 展示我們希望用來替換 thread::spawn
的 ThreadPool
結(jié)構(gòu)體的假想接口:
文件名: src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
示例 20-12: 假想的 ThreadPool
接口
這里使用 ThreadPool::new
來創(chuàng)建一個(gè)新的線程池,它有一個(gè)可配置的線程數(shù)的參數(shù),在這里是四。這樣在 for
循環(huán)中,pool.execute
有著類似 thread::spawn
的接口,它獲取一個(gè)線程池運(yùn)行于每一個(gè)流的閉包。pool.execute
需要實(shí)現(xiàn)為獲取閉包并傳遞給池中的線程運(yùn)行。這段代碼還不能編譯,不過通過嘗試編譯器會(huì)指導(dǎo)我們?nèi)绾涡迯?fù)它。
繼續(xù)并對(duì)示例 20-12 中的 src/main.rs 做出修改,并利用來自 cargo check
的編譯器錯(cuò)誤來驅(qū)動(dòng)開發(fā)。下面是我們得到的第一個(gè)錯(cuò)誤:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:10:16
|
10 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error
好的,這告訴我們需要一個(gè) ThreadPool
類型或模塊,所以我們將構(gòu)建一個(gè)。ThreadPool
的實(shí)現(xiàn)會(huì)與 web server 的特定工作相獨(dú)立,所以讓我們從 hello
crate 切換到存放 ThreadPool
實(shí)現(xiàn)的新庫 crate。這也意味著可以在任何工作中使用這個(gè)單獨(dú)的線程池庫,而不僅僅是處理網(wǎng)絡(luò)請(qǐng)求。
創(chuàng)建 src/lib.rs 文件,它包含了目前可用的最簡(jiǎn)單的 ThreadPool
定義:
文件名: src/lib.rs
pub struct ThreadPool;
接著創(chuàng)建一個(gè)新目錄,src/bin,并將二進(jìn)制 crate 根文件從 src/main.rs 移動(dòng)到 src/bin/main.rs。這使得庫 crate 成為 hello 目錄的主要 crate;不過仍然可以使用 cargo run
運(yùn)行 src/bin/main.rs 二進(jìn)制文件。移動(dòng)了 main.rs 文件之后,修改 src/bin/main.rs 文件開頭加入如下代碼來引入庫 crate 并將 ThreadPool
引入作用域:
文件名: src/bin/main.rs
use hello::ThreadPool;
這仍然不能工作,再次嘗試運(yùn)行來得到下一個(gè)需要解決的錯(cuò)誤:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/bin/main.rs:11:28
|
11 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error
這告訴我們下一步是為 ThreadPool
創(chuàng)建一個(gè)叫做 new
的關(guān)聯(lián)函數(shù)。我們還知道 new
需要有一個(gè)參數(shù)可以接受 4
,而且 new
應(yīng)該返回 ThreadPool
實(shí)例。讓我們實(shí)現(xiàn)擁有此特征的最小化 new
函數(shù):
文件夾: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
這里選擇 usize
作為 size
參數(shù)的類型,因?yàn)槲覀冎罏樨?fù)的線程數(shù)沒有意義。我們還知道將使用 4 作為線程集合的元素?cái)?shù)量,這也就是使用 usize
類型的原因,如第三章 “整型” 部分所講。
再次編譯檢查這段代碼:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/bin/main.rs:16:14
|
16 | pool.execute(|| {
| ^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error
現(xiàn)在有了一個(gè)警告和一個(gè)錯(cuò)誤。暫時(shí)先忽略警告,發(fā)生錯(cuò)誤是因?yàn)椴]有 ThreadPool
上的 execute
方法?;貞?nbsp;“為有限數(shù)量的線程創(chuàng)建一個(gè)類似的接口” 部分我們決定線程池應(yīng)該有與 thread::spawn
類似的接口,同時(shí)我們將實(shí)現(xiàn) execute
函數(shù)來獲取傳遞的閉包并將其傳遞給池中的空閑線程執(zhí)行。
我們會(huì)在 ThreadPool
上定義 execute
函數(shù)來獲取一個(gè)閉包參數(shù)。回憶第十三章的 “使用帶有泛型和 Fn trait 的閉包” 部分,閉包作為參數(shù)時(shí)可以使用三個(gè)不同的 trait:Fn
、FnMut
和 FnOnce
。我們需要決定這里應(yīng)該使用哪種閉包。最終需要實(shí)現(xiàn)的類似于標(biāo)準(zhǔn)庫的 thread::spawn
,所以我們可以觀察 thread::spawn
的簽名在其參數(shù)中使用了何種 bound。查看文檔會(huì)發(fā)現(xiàn):
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
F
是這里我們關(guān)心的參數(shù);T
與返回值有關(guān)所以我們并不關(guān)心??紤]到 spawn
使用 FnOnce
作為 F
的 trait bound,這可能也是我們需要的,因?yàn)樽罱K會(huì)將傳遞給 execute
的參數(shù)傳給 spawn
。因?yàn)樘幚碚?qǐng)求的線程只會(huì)執(zhí)行閉包一次,這也進(jìn)一步確認(rèn)了 FnOnce
是我們需要的 trait,這里符合 FnOnce
中 Once
的意思。
F
還有 trait bound Send
和生命周期綁定 'static
,這對(duì)我們的情況也是有意義的:需要 Send
來將閉包從一個(gè)線程轉(zhuǎn)移到另一個(gè)線程,而 'static
是因?yàn)椴⒉恢谰€程會(huì)執(zhí)行多久。讓我們編寫一個(gè)使用帶有這些 bound 的泛型參數(shù) F
的 ThreadPool
的 execute
方法:
文件名: src/lib.rs
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
FnOnce
trait 仍然需要之后的 ()
,因?yàn)檫@里的 FnOnce
代表一個(gè)沒有參數(shù)也沒有返回值的閉包。正如函數(shù)的定義,返回值類型可以從簽名中省略,不過即便沒有參數(shù)也需要括號(hào)。
這里再一次增加了 execute
方法的最小化實(shí)現(xiàn):它沒有做任何工作,只是嘗試讓代碼能夠編譯。再次進(jìn)行檢查:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
現(xiàn)在就只有警告了!這意味著能夠編譯了!注意如果嘗試 cargo run
運(yùn)行程序并在瀏覽器中發(fā)起請(qǐng)求,仍會(huì)在瀏覽器中出現(xiàn)在本章開始時(shí)那樣的錯(cuò)誤。這個(gè)庫實(shí)際上還沒有調(diào)用傳遞給 execute
的閉包!
一個(gè)你可能聽說過的關(guān)于像 Haskell 和 Rust 這樣有嚴(yán)格編譯器的語言的說法是 “如果代碼能夠編譯,它就能工作”。這是一個(gè)提醒大家的好時(shí)機(jī),實(shí)際上這并不是普適的。我們的項(xiàng)目可以編譯,不過它完全沒有做任何工作!如果構(gòu)建一個(gè)真實(shí)且功能完整的項(xiàng)目,則需花費(fèi)大量的時(shí)間來開始編寫單元測(cè)試來檢查代碼能否編譯 并且 擁有期望的行為。
這里仍然存在警告是因?yàn)槠洳]有對(duì) new
和 execute
的參數(shù)做任何操作。讓我們用期望的行為來實(shí)現(xiàn)這些函數(shù)。以考慮 new
作為開始。之前選擇使用無符號(hào)類型作為 size
參數(shù)的類型,因?yàn)榫€程數(shù)為負(fù)的線程池沒有意義。然而,線程數(shù)為零的線程池同樣沒有意義,不過零是一個(gè)完全有效的 u32
值。讓我們?cè)黾釉诜祷?nbsp;ThreadPool
實(shí)例之前檢查 size
是否大于零的代碼,并使用 assert!
宏在得到零時(shí) panic,如示例 20-13 所示:
文件名: src/lib.rs
impl ThreadPool {
/// 創(chuàng)建線程池。
///
/// 線程池中線程的數(shù)量。
///
/// # Panics
///
/// `new` 函數(shù)在 size 為 0 時(shí)會(huì) panic。
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
}
示例 20-13: 實(shí)現(xiàn) ThreadPool::new
在 size
為零時(shí) panic
這里用文檔注釋為 ThreadPool
增加了一些文檔。注意這里遵循了良好的文檔實(shí)踐并增加了一個(gè)部分來提示函數(shù)會(huì) panic 的情況,正如第十四章所討論的。嘗試運(yùn)行 cargo doc --open
并點(diǎn)擊 ThreadPool
結(jié)構(gòu)體來查看生成的 new
的文檔看起來如何!
相比像這里使用 assert!
宏,也可以讓 new
像之前 I/O 項(xiàng)目中示例 12-9 中 Config::new
那樣返回一個(gè) Result
,不過在這里我們選擇創(chuàng)建一個(gè)沒有任何線程的線程池應(yīng)該是不可恢復(fù)的錯(cuò)誤。如果你想做的更好,嘗試編寫一個(gè)采用如下簽名的 new
版本來感受一下兩者的區(qū)別:
pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
現(xiàn)在有了一個(gè)有效的線程池線程數(shù),就可以實(shí)際創(chuàng)建這些線程并在返回之前將他們儲(chǔ)存在 ThreadPool
結(jié)構(gòu)體中。不過如何 “儲(chǔ)存” 一個(gè)線程?讓我們?cè)倏纯?nbsp;thread::spawn
的簽名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn
返回 JoinHandle<T>
,其中 T
是閉包返回的類型。嘗試使用 JoinHandle
來看看會(huì)發(fā)生什么。在我們的情況中,傳遞給線程池的閉包會(huì)處理連接并不返回任何值,所以 T
將會(huì)是單元類型 ()
。
示例 20-14 中的代碼可以編譯,不過實(shí)際上還并沒有創(chuàng)建任何線程。我們改變了 ThreadPool
的定義來存放一個(gè) thread::JoinHandle<()>
的 vector 實(shí)例,使用 size
容量來初始化,并設(shè)置一個(gè) for
循環(huán)了來運(yùn)行創(chuàng)建線程的代碼,并返回包含這些線程的 ThreadPool
實(shí)例:
文件名: src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
}
示例 20-14: 為 ThreadPool
創(chuàng)建一個(gè) vector 來存放線程
這里將 std::thread
引入庫 crate 的作用域,因?yàn)槭褂昧?nbsp;thread::JoinHandle
作為 ThreadPool
中 vector 元素的類型。
在得到了有效的數(shù)量之后,ThreadPool
新建一個(gè)存放 size
個(gè)元素的 vector。本書還未使用過 with_capacity
,它與 Vec::new
做了同樣的工作,不過有一個(gè)重要的區(qū)別:它為 vector 預(yù)先分配空間。因?yàn)橐呀?jīng)知道了 vector 中需要 size
個(gè)元素,預(yù)先進(jìn)行分配比僅僅 Vec::new
要稍微有效率一些,因?yàn)?nbsp;Vec::new
隨著插入元素而重新改變大小。
如果再次運(yùn)行 cargo check
,會(huì)看到一些警告,不過應(yīng)該可以編譯成功。
示例 20-14 的 for
循環(huán)中留下了一個(gè)關(guān)于創(chuàng)建線程的注釋。如何實(shí)際創(chuàng)建線程呢?這是一個(gè)難題。標(biāo)準(zhǔn)庫提供的創(chuàng)建線程的方法,thread::spawn
,它期望獲取一些一旦創(chuàng)建線程就應(yīng)該執(zhí)行的代碼。然而,我們希望開始線程并使其等待稍后傳遞的代碼。標(biāo)準(zhǔn)庫的線程實(shí)現(xiàn)并沒有包含這么做的方法;我們必須自己實(shí)現(xiàn)。
我們將要實(shí)現(xiàn)的行為是創(chuàng)建線程并稍后發(fā)送代碼,這會(huì)在 ThreadPool
和線程間引入一個(gè)新數(shù)據(jù)類型來管理這種新行為。這個(gè)數(shù)據(jù)結(jié)構(gòu)稱為 Worker
:這是一個(gè)池實(shí)現(xiàn)中的常見概念。想象一下在餐館廚房工作的員工:?jiǎn)T工等待來自客戶的訂單,他們負(fù)責(zé)接受這些訂單并完成它們。
不同于在線程池中儲(chǔ)存一個(gè) JoinHandle<()>
實(shí)例的 vector,我們會(huì)儲(chǔ)存 Worker
結(jié)構(gòu)體的實(shí)例。每一個(gè) Worker
會(huì)儲(chǔ)存一個(gè)單獨(dú)的 JoinHandle<()>
實(shí)例。接著會(huì)在 Worker
上實(shí)現(xiàn)一個(gè)方法,它會(huì)獲取需要允許代碼的閉包并將其發(fā)送給已經(jīng)運(yùn)行的線程執(zhí)行。我們還會(huì)賦予每一個(gè) worker id
,這樣就可以在日志和調(diào)試中區(qū)別線程池中的不同 worker。
首先,讓我們做出如此創(chuàng)建 ThreadPool
時(shí)所需的修改。在通過如下方式設(shè)置完 Worker
之后,我們會(huì)實(shí)現(xiàn)向線程發(fā)送閉包的代碼:
Worker
? 結(jié)構(gòu)體存放 ?id
? 和 ?JoinHandle<()>
?ThreadPool
? 存放一個(gè) ?Worker
? 實(shí)例的 vectorWorker::new
? 函數(shù),它獲取一個(gè) ?id
? 數(shù)字并返回一個(gè)帶有 ?id
? 和用空閉包分配的線程的 ?Worker
? 實(shí)例ThreadPool::new
? 中,使用 ?for
? 循環(huán)計(jì)數(shù)生成 ?id
?,使用這個(gè) ?id
? 新建 ?Worker
?,并儲(chǔ)存進(jìn) vector 中如果你渴望挑戰(zhàn),在查示例 20-15 中的代碼之前嘗試自己實(shí)現(xiàn)這些修改。
準(zhǔn)備好了嗎?示例 20-15 就是一個(gè)做出了這些修改的例子:
文件名: src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
示例 20-15: 修改 ThreadPool
存放 Worker
實(shí)例而不是直接存放線程
這里將 ThreadPool
中字段名從 threads
改為 workers
,因?yàn)樗F(xiàn)在儲(chǔ)存 Worker
而不是 JoinHandle<()>
。使用 for
循環(huán)中的計(jì)數(shù)作為 Worker::new
的參數(shù),并將每一個(gè)新建的 Worker
儲(chǔ)存在叫做 workers
的 vector 中。
Worker
結(jié)構(gòu)體和其 new
函數(shù)是私有的,因?yàn)橥獠看a(比如 src/bin/main.rs 中的 server)并不需要知道關(guān)于 ThreadPool
中使用 Worker
結(jié)構(gòu)體的實(shí)現(xiàn)細(xì)節(jié)。Worker::new
函數(shù)使用 id
參數(shù)并儲(chǔ)存了使用一個(gè)空閉包創(chuàng)建的 JoinHandle<()>
。
這段代碼能夠編譯并用指定給 ThreadPool::new
的參數(shù)創(chuàng)建儲(chǔ)存了一系列的 Worker
實(shí)例,不過 仍然 沒有處理 execute
中得到的閉包。讓我們聊聊接下來怎么做。
下一個(gè)需要解決的問題是傳遞給 thread::spawn
的閉包完全沒有做任何工作。目前,我們?cè)?nbsp;execute
方法中獲得期望執(zhí)行的閉包,不過在創(chuàng)建 ThreadPool
的過程中創(chuàng)建每一個(gè) Worker
時(shí)需要向 thread::spawn
傳遞一個(gè)閉包。
我們希望剛創(chuàng)建的 Worker
結(jié)構(gòu)體能夠從 ThreadPool
的隊(duì)列中獲取需要執(zhí)行的代碼,并發(fā)送到線程中執(zhí)行他們。
在第十六章,我們學(xué)習(xí)了 信道 —— 一個(gè)溝通兩個(gè)線程的簡(jiǎn)單手段 —— 對(duì)于這個(gè)例子來說則是絕佳的。這里信道將充當(dāng)任務(wù)隊(duì)列的作用,execute
將通過 ThreadPool
向其中線程正在尋找工作的 Worker
實(shí)例發(fā)送任務(wù)。如下是這個(gè)計(jì)劃:
ThreadPool
? 會(huì)創(chuàng)建一個(gè)信道并充當(dāng)發(fā)送端。Worker
? 將會(huì)充當(dāng)信道的接收端。Job
? 結(jié)構(gòu)體來存放用于向信道中發(fā)送的閉包。execute
? 方法會(huì)在信道發(fā)送端發(fā)出期望執(zhí)行的任務(wù)。Worker
? 會(huì)遍歷信道的接收端并執(zhí)行任何接收到的任務(wù)。讓我們以在 ThreadPool::new
中創(chuàng)建信道并讓 ThreadPool
實(shí)例充當(dāng)發(fā)送端開始,如示例 20-16 所示。Job
是將在信道中發(fā)出的類型,目前它是一個(gè)沒有任何內(nèi)容的結(jié)構(gòu)體:
文件名: src/lib.rs
// --snip--
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
}
示例 20-16: 修改 ThreadPool
來儲(chǔ)存一個(gè)發(fā)送 Job
實(shí)例的信道發(fā)送端
在 ThreadPool::new
中,新建了一個(gè)信道,并接著讓線程池在接收端等待。這段代碼能夠編譯,不過仍有警告。
讓我們嘗試在線程池創(chuàng)建每個(gè) worker 時(shí)將信道的接收端傳遞給他們。須知我們希望在 worker 所分配的線程中使用信道的接收端,所以將在閉包中引用 receiver
參數(shù)。示例 20-17 中展示的代碼還不能編譯:
文件名: src/lib.rs
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
示例 20-17: 將信道的接收端傳遞給 worker
這是一些小而直觀的修改:將信道的接收端傳遞進(jìn)了 Worker::new
,并接著在閉包中使用它。
如果嘗試 check 代碼,會(huì)得到這個(gè)錯(cuò)誤:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:27:42
|
22 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
27 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error
這段代碼嘗試將 receiver
傳遞給多個(gè) Worker
實(shí)例。這是不行的,回憶第十六章:Rust 所提供的信道實(shí)現(xiàn)是多 生產(chǎn)者,單 消費(fèi)者 的。這意味著不能簡(jiǎn)單的克隆信道的消費(fèi)端來解決問題。即便可以,那也不是我們希望使用的技術(shù);我們希望通過在所有的 worker 中共享單一 receiver
,在線程間分發(fā)任務(wù)。
另外,從信道隊(duì)列中取出任務(wù)涉及到修改 receiver
,所以這些線程需要一個(gè)能安全的共享和修改 receiver
的方式,否則可能導(dǎo)致競(jìng)爭(zhēng)狀態(tài)(參考第十六章)。
回憶一下第十六章討論的線程安全智能指針,為了在多個(gè)線程間共享所有權(quán)并允許線程修改其值,需要使用 Arc<Mutex<T>>
。Arc
使得多個(gè) worker 擁有接收端,而 Mutex
則確保一次只有一個(gè) worker 能從接收端得到任務(wù)。示例 20-18 展示了所需的修改:
文件名: src/lib.rs
use std::sync::Arc;
use std::sync::Mutex;
// --snip--
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
}
}
示例 20-18: 使用 Arc
和 Mutex
在 worker 間共享信道的接收端
在 ThreadPool::new
中,將信道的接收端放入一個(gè) Arc
和一個(gè) Mutex
中。對(duì)于每一個(gè)新 worker,克隆 Arc
來增加引用計(jì)數(shù),如此這些 worker 就可以共享接收端的所有權(quán)了。
通過這些修改,代碼可以編譯了!我們做到了!
最后讓我們實(shí)現(xiàn) ThreadPool
上的 execute
方法。同時(shí)也要修改 Job
結(jié)構(gòu)體:它將不再是結(jié)構(gòu)體,Job
將是一個(gè)有著 execute
接收到的閉包類型的 trait 對(duì)象的類型別名。第十九章 “類型別名用來創(chuàng)建類型同義詞” 部分提到過,類型別名允許將長(zhǎng)的類型變短。觀察示例 20-19:
文件名: src/lib.rs
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
示例 20-19: 為存放每一個(gè)閉包的 Box
創(chuàng)建一個(gè) Job
類型別名,接著在信道中發(fā)出任務(wù)
在使用 execute
得到的閉包新建 Job
實(shí)例之后,將這些任務(wù)從信道的發(fā)送端發(fā)出。這里調(diào)用 send
上的 unwrap
,因?yàn)榘l(fā)送可能會(huì)失敗,這可能發(fā)生于例如停止了所有線程執(zhí)行的情況,這意味著接收端停止接收新消息了。不過目前我們無法停止線程執(zhí)行;只要線程池存在他們就會(huì)一直執(zhí)行。使用 unwrap
是因?yàn)槲覀冎朗〔豢赡馨l(fā)生,即便編譯器不這么認(rèn)為。
不過到此事情還沒有結(jié)束!在 worker 中,傳遞給 thread::spawn
的閉包仍然還只是 引用 了信道的接收端。相反我們需要閉包一直循環(huán),向信道的接收端請(qǐng)求任務(wù),并在得到任務(wù)時(shí)執(zhí)行他們。如示例 20-20 對(duì) Worker::new
做出修改:
文件名: src/lib.rs
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
});
Worker { id, thread }
}
}
示例 20-20: 在 worker 線程中接收并執(zhí)行任務(wù)
這里,首先在 receiver
上調(diào)用了 lock
來獲取互斥器,接著 unwrap
在出現(xiàn)任何錯(cuò)誤時(shí) panic。如果互斥器處于一種叫做 被污染(poisoned)的狀態(tài)時(shí)獲取鎖可能會(huì)失敗,這可能發(fā)生于其他線程在持有鎖時(shí) panic 了且沒有釋放鎖。在這種情況下,調(diào)用 unwrap
使其 panic 是正確的行為。請(qǐng)隨意將 unwrap
改為包含有意義錯(cuò)誤信息的 expect
。
如果鎖定了互斥器,接著調(diào)用 recv
從信道中接收 Job
。最后的 unwrap
也繞過了一些錯(cuò)誤,這可能發(fā)生于持有信道發(fā)送端的線程停止的情況,類似于如果接收端關(guān)閉時(shí) send
方法如何返回 Err
一樣。
調(diào)用 recv
會(huì)阻塞當(dāng)前線程,所以如果還沒有任務(wù),其會(huì)等待直到有可用的任務(wù)。Mutex<T>
確保一次只有一個(gè) Worker
線程嘗試請(qǐng)求任務(wù)。
現(xiàn)在線程池處于可以運(yùn)行的狀態(tài)了!執(zhí)行 cargo run
并發(fā)起一些請(qǐng)求:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: 3 warnings emitted
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/main`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功了!現(xiàn)在我們有了一個(gè)可以異步執(zhí)行連接的線程池!它絕不會(huì)創(chuàng)建超過四個(gè)線程,所以當(dāng) server 收到大量請(qǐng)求時(shí)系統(tǒng)也不會(huì)負(fù)擔(dān)過重。如果請(qǐng)求 /sleep,server 也能夠通過另外一個(gè)線程處理其他請(qǐng)求。
注意如果同時(shí)在多個(gè)瀏覽器窗口打開 /sleep,它們可能會(huì)彼此間隔地加載 5 秒,因?yàn)橐恍g覽器處于緩存的原因會(huì)順序執(zhí)行相同請(qǐng)求的多個(gè)實(shí)例。這些限制并不是由于我們的 web server 造成的。
在學(xué)習(xí)了第十八章的 while let
循環(huán)之后,你可能會(huì)好奇為何不能如此編寫 worker 線程,如示例 20-21 所示:
文件名: src/lib.rs
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker { id, thread }
}
}
示例 20-21: 一個(gè)使用 while let
的 Worker::new
替代實(shí)現(xiàn)
這段代碼可以編譯和運(yùn)行,但是并不會(huì)產(chǎn)生所期望的線程行為:一個(gè)慢請(qǐng)求仍然會(huì)導(dǎo)致其他請(qǐng)求等待執(zhí)行。其原因有些微妙:Mutex
結(jié)構(gòu)體沒有公有 unlock
方法,因?yàn)殒i的所有權(quán)依賴 lock
方法返回的 LockResult<MutexGuard<T>>
中 MutexGuard<T>
的生命周期。這允許借用檢查器在編譯時(shí)確保絕不會(huì)在沒有持有鎖的情況下訪問由 Mutex
守護(hù)的資源,不過如果沒有認(rèn)真的思考 MutexGuard<T>
的生命周期的話,也可能會(huì)導(dǎo)致比預(yù)期更久的持有鎖。
示例 20-20 中的代碼使用的 let job = receiver.lock().unwrap().recv().unwrap();
之所以可以工作是因?yàn)閷?duì)于 let
來說,當(dāng) let
語句結(jié)束時(shí)任何表達(dá)式中等號(hào)右側(cè)使用的臨時(shí)值都會(huì)立即被丟棄。然而 while let
(if let
和 match
)直到相關(guān)的代碼塊結(jié)束都不會(huì)丟棄臨時(shí)值。在示例 20-21 中,job()
調(diào)用期間鎖一直持續(xù),這也意味著其他的 worker 無法接受任務(wù)。
更多建議: