[譯]同步VS異步

2018-06-19 15:35 更新

同步VS異步

Boost.Asio的作者做了一個很驚艷的工作:它可以讓你在同步和異步中自由選擇,從而更好地適應(yīng)你的應(yīng)用。

在之前的章節(jié)中,我們已經(jīng)學(xué)習(xí)了各種類型應(yīng)用的框架,比如同步客戶端,同步服務(wù)端,異步客戶端,異步服務(wù)端。它們中的每一個都可以作為你應(yīng)用的基礎(chǔ)。如果要更加深入地學(xué)習(xí)各種類型應(yīng)用的細(xì)節(jié),請繼續(xù)。

混合同步異步編程

Boost.Asio庫允許你進(jìn)行同步和異步的混合編程。我個人認(rèn)為這是一個壞主意,但是Boost.Asio(就像C++一樣)在你需要的時候允許你深入底層。

通常來說,當(dāng)你寫一個異步應(yīng)用時,你會很容易掉入這個陷阱。比如在響應(yīng)一個異步write操作時,你做了一個同步read操作:

io_service service;
ip::tcp::socket sock(service);
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void on_write(boost::system::error_code err, size_t bytes) {
    char read_buff[512];
    read(sock, buffer(read_buff));
}
async_write(sock, buffer("echo"), on_write);

毫無疑問,同步read操作會阻塞當(dāng)前的線程,從而導(dǎo)致其他任何正在等待的異步操作變成掛起狀態(tài)(對這個線程)。這是一段糟糕的代碼,因?yàn)樗鼤?dǎo)致整個應(yīng)用變得無響應(yīng)或者整個被阻塞掉(所有異步運(yùn)行的端點(diǎn)都必須避免阻塞,而執(zhí)行一個同步的操作違反了這個原則)。

當(dāng)你寫一個同步應(yīng)用時,你不大可能執(zhí)行異步的read或者write操作,因?yàn)橥降厮伎家呀?jīng)意味著用一種線性的方式思考(執(zhí)行A,然后執(zhí)行B,再執(zhí)行C,等等)。

我唯一能想到的同步和異步同時工作的場景就是同步操作和異步操作是完全隔離的,比如,同步和異步從一個數(shù)據(jù)庫進(jìn)行讀寫。

從客戶端傳遞信息到服務(wù)端VS從服務(wù)端傳遞信息到客戶端

成功的客戶端/服務(wù)端應(yīng)用一個很重要的部分就是來回傳遞消息(服務(wù)端到客戶端和客戶端到服務(wù)端)。你需要指定用什么來標(biāo)記一個消息。換句話說,當(dāng)讀取一個輸入的消息時,你怎么判斷它被完整讀取了?

標(biāo)記消息結(jié)尾的方式完全取決于你(標(biāo)記消息的開始很簡單,因?yàn)樗褪乔耙粋€消息之后傳遞過來的第一個字節(jié)),但是要保證消息是簡單且連續(xù)的。

你可以:

  • 消息大小固定(這不是一個很好的主意,如果我們需要發(fā)送更多的數(shù)據(jù)怎么辦?)
  • 通過一個特殊的字符標(biāo)記消息的結(jié)尾,比如’\n’或者’\0’
  • 在消息的頭部指定消息的大小

我在整本書中間采用的方式都是“使用’\n’標(biāo)記消息的結(jié)尾”。所以,每次讀取一條消息都會如下:

char buff_[512];
// 同步讀取
read(sock_, buffer(buff_), boost::bind(&read_complete, this, _1, _2));
// 異步讀取
async_read(sock_, buffer(buff_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
    if ( err) return 0;
    already_read_ = bytes;
    bool found = std::find(buff_, buff_ + bytes, '\n') < buff_ + bytes;
    // 一個一個讀,直到讀到回車,無緩存
    return found ? 0 : 1;
} 

我把在消息頭部指定消息長度這種方式作為一個練習(xí)留給讀者;這非常簡單。

客戶端應(yīng)用中的同步I/O

同步客戶端一般都能歸類到如下兩種情況中的一種:

  • 它向服務(wù)端請求一些東西,讀取結(jié)果,然后處理它們。然后請求一些其他的東西,然后一直持續(xù)下去。事實(shí)上,這很像之前章節(jié)里說到的同步客戶端。
  • 從服務(wù)端讀取消息,處理它,然后寫回結(jié)果。然后讀取另外一條消息,然后一直持續(xù)下去。

這里寫圖片描述

兩種情況都使用“發(fā)送請求-讀取結(jié)果”的策略。換句話說,一個部分發(fā)送一個請求到另外一個部分然后另外一個部分返回結(jié)果。這是實(shí)現(xiàn)客戶端/服務(wù)端應(yīng)用非常簡單的一種方式,同時這也是我非常推薦的一種方式。

你可以創(chuàng)建一個Mambo Jambo類型的客戶端服務(wù)端應(yīng)用,你可以隨心所欲地寫它們中間的任何一個部分,但是這會導(dǎo)致一場災(zāi)難。(你怎么知道當(dāng)客戶端或者服務(wù)端阻塞的時候會發(fā)生什么?)。

上面的情況看上去會比較相似,但是它們非常不同:

  • 前者,服務(wù)端響應(yīng)請求(服務(wù)端等待來自客戶端的請求然后回應(yīng))。這是一個請求式連接,客戶端從服務(wù)端拉取它需要的東西。
  • 后者,服務(wù)端發(fā)送事件到客戶端然后由客戶端響應(yīng)。這是一個推式連接,服務(wù)端推送通知/事件到客戶端。

你大部分時間都在做請求式客戶端/服務(wù)端應(yīng)用,這也是比較簡單,同時也是比較常見的。

你可以把拉取請求(客戶端到服務(wù)端)和推送請求(服務(wù)端到客戶端)結(jié)合起來,但是,這是非常復(fù)雜的,所以你最好避免這種情況 。把這兩種方式結(jié)合的問題在于:如果你使用“發(fā)送請求-讀取結(jié)果”策略。就會發(fā)生下面一系列事情:

  • 客戶端寫入(發(fā)送請求)
  • 服務(wù)端寫入(發(fā)送通知到客戶端)
  • 客戶端讀取服務(wù)端寫入的內(nèi)容,然后將其作為請求的結(jié)果進(jìn)行解析
  • 服務(wù)端阻塞以等待客戶端的返回的結(jié)果,這會在客戶端發(fā)送新請求的時候發(fā)生
  • 服務(wù)端把發(fā)送過來的請求當(dāng)作它等待的結(jié)果進(jìn)行解析
  • 客戶端會阻塞(服務(wù)端不會返回任何結(jié)果,因?yàn)樗芽蛻舳说恼埱螽?dāng)作它通知返回的結(jié)果)

在一個請求式客戶端/服務(wù)端應(yīng)用中,避免上面的情況是非常簡單的。你可以通過實(shí)現(xiàn)一個ping操作的方式來模擬一個推送式請求,我們假設(shè)每5秒鐘客戶端ping一次服務(wù)端。如果沒有事情需要通知,服務(wù)端返回一個類似ping ok的結(jié)果,如果有事情需要通知,服務(wù)端返回一個ping [event_name]。然后客戶端就可以初始化一個新的請求去處理這個事件。

復(fù)習(xí)一下,第一種情況就是之前章節(jié)中的同步客戶端應(yīng)用,它的主循環(huán)如下:

void loop() {
    // 對于我們登錄操作的結(jié)果
    write("login " + username_ + "\n");
    read_answer();
    while ( started_) {
        write_request();
        read_answer();
        ...
    } 
} 

我們對其進(jìn)行修改以適應(yīng)第二種情況:

void loop() {
    while ( started_) {
        read_notification();
        write_answer();
    }
}
void read_notification() {
    already_read_ = 0;
    read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2));
    process_notification();
}
void process_notification() {
    // ... 看通知是什么,然后準(zhǔn)備回復(fù)
}

服務(wù)端應(yīng)用中的同步I/O

類似客戶端,服務(wù)端也被分為兩種情況用來匹配之前章節(jié)中的情況1和情況2。同樣,兩種情況都采用“發(fā)送請求-讀取結(jié)果”的策略。

這里寫圖片描述

第一種情況是我們在之前章節(jié)實(shí)現(xiàn)過的同步服務(wù)端。當(dāng)你是同步時讀取一個完整的請求不是很簡單,因?yàn)槟阈枰苊庾枞ㄍǔ碚f是能讀多少就讀多少):

void read_request() {
    if ( sock_.available())
}
already_read_ += sock_.read_some(buffer(buff_ + already_read_, max_msg - already_read_));

只要一個消息被完整讀到,就對它進(jìn)行處理然后回復(fù)給客戶端:

void process_request() {
    bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_;
    if ( !found_enter)
        return; // 消息不完整
    size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_;
    std::string msg(buff_, pos);
    ...
    if ( msg.find("login ") == 0) on_login(msg);
    else if ( msg.find("ping") == 0) on_ping();
    else ...
} 

如果我們想讓服務(wù)端變成一個推送服務(wù)端,我們通過如下的方式修改:

typedef std::vector<client_ptr> array;
array clients;
array notify;
std::string notify_msg;
void on_new_client() {
    // 新客戶端連接時,我們通知所有客戶端這個事件
    notify = clients;
    std::ostringstream msg;
    msg << "client count " << clients.size();
    notify_msg = msg.str();
    notify_clients();
}
void notify_clients() {
    for ( array::const_iterator b = notify.begin(), e = notify.end(); b != e; ++b) {
        (*b)->sock_.write_some(notify_msg);
    }
} 

on_new_client()方法是事件之一,這個事件我們需要通知所有的客戶端。notify_clients是通知所有對一個事件感興趣客戶端的方法。它發(fā)送消息但是不等待每個客戶端返回的結(jié)果,因?yàn)槟菢拥脑捑蜁?dǎo)致阻塞。當(dāng)客戶端返回一個結(jié)果時,客戶端會告訴我們它為什么回復(fù)(然后我們就可以正確地處理它)。

同步服務(wù)端中的線程

這是一個非常重要的關(guān)注點(diǎn):我們開辟多少線程去處理服務(wù)端請求? 對于一個同步服務(wù)端,我們至少需要一個處理新連接的線程:

void accept_thread() {
    ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
    while ( true) {
        client_ptr new_( new talk_to_client);
        acceptor.accept(new_->sock());
        boost::recursive_mutex::scoped_lock lk(cs);
        clients.push_back(new_);
    } 
} 

對于已經(jīng)存在的客戶端:

  • 我們可以是單線程。這是最簡單的,同時也是我在第四章 同步服務(wù)端中采用的實(shí)現(xiàn)方式。它可以很輕松地處理100-200并發(fā)的客戶端而且有時候會更多,對于大多數(shù)情況來說這已經(jīng)足夠用了。
  • 我們可以對每個客戶端開一個線程。這不是一個很好的選擇;他會浪費(fèi)很多線程而且有時候會導(dǎo)致調(diào)試?yán)щy,而且當(dāng)它需要處理200以上并發(fā)的客戶端的時候,它可能馬上會到達(dá)它的瓶頸。
  • 我們可以用一些固定數(shù)量的線程去處理已經(jīng)存在的客戶端

第三種選擇是同步服務(wù)端中最難實(shí)現(xiàn)的;整個talk_to_client類需要是線程安全的。然后,你需要一個機(jī)制來確定哪個線程處理哪個客戶端。對于這個問題,你有兩個選擇:

  • 將特定的客戶端分配給特定的線程;比如,線程1處理前面20個客戶端,線程2處理21到40個線程,等等。當(dāng)一個線程在使用時(我們在等待被客戶端阻塞的一些東西),我們從已存在客戶端列表中將其取出來。等我們處理完之后,再把它放回到列表中。每個線程都會循環(huán)遍歷已經(jīng)存在的客戶端列表,然后把擁有完整請求的第一個客戶端提出來(我們已經(jīng)從客戶端讀取了一條完整的消息),然后回復(fù)它。
  • 服務(wù)端可能會變得無響應(yīng)
    • 第一種情況,被同一個線程處理的幾個客戶端同時發(fā)送請求,因?yàn)橐粋€線程在同一時刻只能處理一個請求。所以這種情況我們什么也不能做。
    • 第二種情況,如果我們發(fā)現(xiàn)并發(fā)請求大于當(dāng)前線程個數(shù)的時候。我們可以簡單地創(chuàng)建新線程來處理當(dāng)前的壓力。

下面的代碼片段有點(diǎn)類似之前的answer_to_client方法,它向我們展示了第二種方法的實(shí)現(xiàn)方式:

struct talk_to_client : boost::enable_shared_from_this<talk_to_client>
{
    ...
    void answer_to_client() {
        try {
            read_request();
            process_request();
        } catch ( boost::system::system_error&) { stop(); }
    } 
}; 

我們需要對它進(jìn)行修改使它變成下面代碼片段的樣子:

struct talk_to_client : boost::enable_shared_from_this<talk_to_client>
{
    boost::recursive_mutex cs;
    boost::recursive_mutex cs_ask;
    bool in_process;
    void answer_to_client() {
        { boost::recursive_mutex::scoped_lock lk(cs_ask);
            if ( in_process)
                return;
            in_process = true;
        }
        { boost::recursive_mutex::scoped_lock lk(cs);
            try {
                read_request();
                process_request();
            }catch ( boost::system::system_error&) {
                stop();
            }
        }
        { boost::recursive_mutex::scoped_lock lk(cs_ask);
            in_process = false;
        }
    } 
}; 

當(dāng)我們在處理一個客戶端請求的時候,它的in_process變量被設(shè)置成true,其他的線程就會忽略這個客戶端。額外的福利就是handle_clients_thread()方法不需要做任何修改;你可以隨心所欲地創(chuàng)建你想要數(shù)量的handle_clients_thread()方法。

客戶端應(yīng)用中的異步I/O

主流程和同步客戶端應(yīng)用有點(diǎn)類似,不同的是Boost.Asio每次都位于async_read和async_write請求中間。

這里寫圖片描述

第一種情況是我在第四章 客戶端和服務(wù)端 中實(shí)現(xiàn)過的。你應(yīng)該還記得在每個異步操作結(jié)束的時候,我都啟動另外一個異步操作,這樣service.run()方法才不會結(jié)束。

為了適應(yīng)第二種情況,你需要使用下面的代碼片段:

void on_connect() {
    do_read();
}
void do_read() {
    async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void on_read(const error_code & err, size_t bytes) {
    if ( err) stop();
    if ( !started() ) return;
    std::string msg(read_buffer_, bytes);
    if ( msg.find("clients") == 0) on_clients(msg);
    else ...
}
void on_clients(const std::string & msg) {
    std::string clients = msg.substr(8);
    std::cout << username_ << ", new client list:" << clients ;
    do_write("clients ok\n");
} 

注意只要我們成功連接上,我們就開始從服務(wù)端讀取。每個on_[event]方法都會通過寫一個回復(fù)給服務(wù)端的方式來結(jié)束我們。

使用異步的美好在于你可以使用Boost.Asio進(jìn)行管理,從而把I/O網(wǎng)絡(luò)操作和其他異步操作結(jié)合起來。盡管它的流程不像同步的流程那么清晰,你仍然可以用同步的方式來想象它。

假設(shè),你從一個web服務(wù)器讀取文件然后把它們保存到一個數(shù)據(jù)庫中(異步地)。你可以把這個過程想象成下面的流程圖:

這里寫圖片描述

服務(wù)端應(yīng)用的異步I/O

現(xiàn)在要展示的是兩個普遍的情況,情況1(拉?。┖颓闆r2(推送)

這里寫圖片描述

第一種情況同樣是我在第4章 客戶端和服務(wù)端 中實(shí)現(xiàn)的異步服務(wù)端。在每一個異步操作最后,我都會啟動另外一個異步操作,這樣的話service.run()就不會結(jié)束。 現(xiàn)在要展示的是被剪裁過的框架代碼。下面是talk_to_client類所有的成員:

void start() {
    ...
    do_read(); // first, we wait for client to login
}
void on_read(const error_code & err, size_t bytes) {
    std::string msg(read_buffer_, bytes);
    if ( msg.find("login ") == 0) on_login(msg);
    else if ( msg.find("ping") == 0) on_ping();
    else
    ...
}
void on_login(const std::string & msg) {
    std::istringstream in(msg);
    in >> username_ >> username_;
    do_write("login ok\n");
}
void do_write(const std::string & msg) {
    std::copy(msg.begin(), msg.end(), write_buffer_);
    sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
void on_write(const error_code & err, size_t bytes) { do_read(); } 

簡單來說,我們始終等待一個read操作,而且只要一發(fā)生,我們就處理然后將結(jié)果返回給客戶端。

我們把上述代碼進(jìn)行修改就可以完成一個推送服務(wù)端

void start() {
    ...
    on_new_client_event();
}
void on_new_client_event() {
    std::ostringstream msg;
    msg << "client count " << clients.size();
    for ( array::const_iterator b = clients.begin(), e = clients.end(); (*b)->do_write(msg.str());
} 
void on_read(const error_code & err, size_t bytes) {
    std::string msg(read_buffer_, bytes);
    // 在這里我們基本上只知道我們的客戶端接收到我們的通知
}
void do_write(const std::string & msg) {
    std::copy(msg.begin(), msg.end(), write_buffer_);
    sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
void on_write(const error_code & err, size_t bytes) { do_read(); } 

只要有一個事件發(fā)生,我們假設(shè)是on_new_client_event,所有需要被通知到的客戶端就都收到一條信息。當(dāng)它們回復(fù)時,我們簡單認(rèn)為他們已經(jīng)確認(rèn)收到事件。注意我們永遠(yuǎn)不會把正在等待的異步操作用盡(所以,service.run()不會結(jié)束),因?yàn)槲覀円恢痹诘却粋€新的客戶端:

ip::tcp::acceptor acc(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
    client->start();
    talk_to_client::ptr new_client = talk_to_client::new_();
    acc.async_accept(new_client->sock(), bind(handle_accept,new_client,_1));
}

異步服務(wù)端中的多線程

我在第4章 客戶端和服務(wù)端 展示的異步服務(wù)端是單線程的,所有的事情都發(fā)生在main()中:

int main() {
    talk_to_client::ptr client = talk_to_client::new_();
    acc.async_accept(client->sock(), boost::bind(handle_
accept,client,_1));
    service.run();
} 

異步的美妙之處就在于可以非常簡單地把單線程變?yōu)槎嗑€程。你可以一直保持單線程直到你的并發(fā)客戶端超過200。然后,你可以使用如下的代碼片段把單線程變成100個線程:

boost::thread_group threads;
void listen_thread() {
    service.run();
}
void start_listen(int thread_count) {
    for ( int i = 0; i < thread_count; ++i)
        threads.create_thread( listen_thread);
}
int main(int argc, char* argv[]) {
    talk_to_client::ptr client = talk_to_client::new_();
    acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
    start_listen(100);
    threads.join_all();
}

當(dāng)然,一旦你選擇了多線程,你需要考慮線程安全。盡管你在線程A中調(diào)用了*async_*,但是它的完成處理流程可以在線程B中被調(diào)用(因?yàn)榫€程B也調(diào)用了service.run())。對于它本身而言這不是問題。只要你遵循邏輯流程,也就是從async_read()on_read(),從on_read()到process_request,從process_requestasync_write(),從async_write()on_write(),從on_write()到async_read(),然后在你的talk_to_client*類中也沒有被調(diào)用的公有方法,這樣的話盡管不同的方法可以在不同的線程中被調(diào)用,它們還是會被有序地調(diào)用。從而不需要互斥量。

這也意味著對于一個客戶端,只會有一個異步操作在等待。假如在某些情況,一個客戶端有兩個異步方法在等待,你就需要互斥量了。這是因?yàn)閮蓚€等待的操作可能正好在同一個時間完成,然后我們就會在兩個不同的線程中間同時調(diào)用他們的完成處理函數(shù)。所以,這里需要線程安全,也就是需要使用互斥量。 在我們的異步服務(wù)端中,我們確實(shí)同時有兩個等待的操作:

void do_read() {
    async_read(sock_, buffer(read_buffer_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
    post_check_ping();
}
void post_check_ping() {
    timer_.expires_from_now(boost::posix_time::millisec(5000));
    timer_.async_wait( MEM_FN(on_check_ping));
}

當(dāng)在做一個read操作時,我們會異步等待read操作完成和超時。所以,這里需要線程安全。

我的建議是,如果你準(zhǔn)備使用多線程,從開始就保證你的類是線程安全的。通常這不會影響它的性能(當(dāng)然你也可以在配置中設(shè)置開關(guān))。同時,如果你準(zhǔn)備使用多線程,從一個開始就使用。這樣的話你能盡早地發(fā)現(xiàn)可能存在的問題。一旦你發(fā)現(xiàn)一個問題,你首先需要檢查的事情就是:單線程運(yùn)行的時候是否會發(fā)生?如果是,它很簡單;只要調(diào)試它就可以了。否則,你可能忘了對一些方法加鎖(互斥量)。

因?yàn)槲覀兊睦有枰蔷€程安全的,我已經(jīng)把talk_to_client修改成使用互斥量的了。同時,我們也有一個客戶端連接的列表,它也需要自己的互斥量,因?yàn)槲覀冇袝r需要訪問它。

避免死鎖和內(nèi)存沖突不是那么容易。下面是我需要對update_client_changed()方法進(jìn)行修改的地方:

void update_clients_changed() {
    array copy;
    { boost::recursive_mutex::scoped_lock lk(clients_cs); copy = clients; }
    for( array::iterator b = copy.begin(), e = copy.end(); b != e; ++b)
        (*b)->set_clients_changed();
} 

你需要避免的是同時有兩個互斥量被鎖定(這會導(dǎo)致死鎖)。在我們的例子中,我們不想clients_cs和一個客戶端的cs_互斥量同時被鎖住

異步操作

Boost.Asio同樣允許你異步地運(yùn)行你任何一個方法。僅僅需要使用下面的代碼片段:

void my_func() {
    ...
}
service.post(my_func);

這樣就可以保證my_func在調(diào)用了service.run()方法的某個線程中間被調(diào)用。你同樣可以異步地調(diào)用一個有完成處理handler的方法,方法的handler會在方法結(jié)束的時候通知你。偽代碼如下:

void on_complete() {
    ...
}
void my_func() {
    ...
    service.post(on_complete);
}
async_call(my_func);

沒有現(xiàn)成的async_call方法,因此,你需要自己創(chuàng)建。幸運(yùn)的是,它不是很復(fù)雜,參考下面的代碼片段:

struct async_op : boost::enable_shared_from_this<async_op>, ... {
    typedef boost::function<void(boost::system::error_code)>completion_func;
    typedef boost::function<boost::system::error_code ()> op_func;
    struct operation { ... };
    void start() {
        { boost::recursive_mutex::scoped_lock lk(cs_);
            if ( started_) return; started_ = true; }
        boost::thread t(boost::bind(&async_op::run,this));
    }
    void add(op_func op, completion_func completion, io_service &service) {
        self_ = shared_from_this();
        boost::recursive_mutex::scoped_lock lk(cs_);
        ops_.push_back( operation(service, op, completion));
        if ( !started_) start();
    } 
    void stop() {
        boost::recursive_mutex::scoped_lock lk(cs_);
        started_ = false; ops_.clear();
    } 
private:
    boost::recursive_mutex cs_;
    std::vector<operation> ops_;
    bool started_;
    ptr self_;
};

async_op方法創(chuàng)建了一個后臺線程,這個線程會運(yùn)行(run())你添加(add())到它里面的所有的異步操作。為了讓事情簡單一些,每個操作都包含下面的內(nèi)容:

  • 一個異步調(diào)用的方法
  • 當(dāng)?shù)谝粋€方法結(jié)束時被調(diào)用的一個完成處理handler
  • 會運(yùn)行完成處理handler的io_service實(shí)例。這也是完成時通知你的地方。參考下面的代碼:

struct async_op : boost::enable_shared_from_this<async_op>, private boost::noncopyable {
    struct operation {
        operation(io_service & service, op_func op, completion_func completion) : service(&service), op(op), completion(completion) , work(new io_service::work(service)) {}
        operation() : service(0) {}
        io_service * service;
        op_func op;
        completion_func completion;
        typedef boost::shared_ptr<io_service::work> work_ptr;
        work_ptr work;
    };
    ... 
}; 

它們被operation結(jié)構(gòu)體包含在內(nèi)部。注意當(dāng)有一個操作在等待時,我們在操作的構(gòu)造方法中構(gòu)造一個io_service::work實(shí)例,從而保證直到我們完成異步調(diào)用之前service.run()都不會結(jié)束(當(dāng)io_service::work實(shí)例保持活動時,service.run()就會認(rèn)為它有工作需要做)。參考下面的代碼片段:

struct async_op : ... {
    typedef boost::shared_ptr<async_op> ptr;
    static ptr new_() { return ptr(new async_op); }
    ...
    void run() {
        while ( true) {
            { boost::recursive_mutex::scoped_lock lk(cs_);
                if ( !started_) break; }
            boost::this_thread::sleep(boost::posix_time::millisec(10));
            operation cur;
            { boost::recursive_mutex::scoped_lock lk(cs_);
                if ( !ops_.empty()) {
                    cur = ops_[0]; 
                    ops_.erase(ops_.begin());
                }
            }
            if ( cur.service)
                cur.service->post(boost::bind(cur.completion, cur.op()));        
        }
        self_.reset();
    }
}; 

run()方法就是后臺線程;它僅僅觀察是否有工作需要做,如果有,就一個一個地運(yùn)行這些異步方法。在每個調(diào)用結(jié)束的時候,它會調(diào)用相關(guān)的完成處理方法。

為了測試,我們創(chuàng)建一個會被異步執(zhí)行的compute_file-checksum方法

size_t checksum = 0;
boost::system::error_code compute_file_checksum(std::string file_name)
{
    HANDLE file = ::CreateFile(file_name.c_str(),GENERIC_READ, 0, 0,OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
    windows::random_access_handle h(service, file);
    long buff[1024];
    checksum = 0;
    size_t bytes = 0, at = 0;
    boost::system::error_code ec;
    while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) {
        at += bytes; bytes /= sizeof(long);
        for ( size_t i = 0; i < bytes; ++i)
            checksum += buff[i];
    }
    return boost::system::error_code(0,boost::system::generic_category());
}
void on_checksum(std::string file_name, boost::system::error_code) {
    std::cout << "checksum for " << file_name << "=" << checksum << std::endl;
}
int main(int argc, char* argv[]) {
    std::string fn = "readme.txt";
    async_op::new_()->add( service, boost::bind(compute_file_checksum,fn),boost::bind(on_checksum,fn,_1));
    service.run();
}

注意我展示給你的只是實(shí)現(xiàn)異步調(diào)用一個方法的一種可能。除了像我這樣實(shí)現(xiàn)一個后臺線程,你可以使用一個內(nèi)部io_service實(shí)例,然后推送(post())異步方法給這個實(shí)例調(diào)用。這個作為一個練習(xí)留給讀者。

你也可以擴(kuò)展這個類讓其可以展示一個異步操作的進(jìn)度(比如,使用百分比)。這樣做你就可以在主線程通過一個進(jìn)度條來顯示進(jìn)度。

代理實(shí)現(xiàn)

代理一般位于客戶端和服務(wù)端之間。它接受客戶端的請求,可能會對請求進(jìn)行修改,然后接著把請求發(fā)送到服務(wù)端。然后從服務(wù)端取回結(jié)果,可能也會對結(jié)果進(jìn)行修改,然后接著把結(jié)果發(fā)送到客戶端。

這里寫圖片描述

代理有什么特別的?我們講述它的目的在于:對每個連接,你都需要兩個sokect,一個給客戶端,另外一個給服務(wù)端。這些都給實(shí)現(xiàn)一個代理增加了不小的難度。

實(shí)現(xiàn)一個同步的代理應(yīng)用比異步的方式更加復(fù)雜;數(shù)據(jù)可能同時從兩個端過來(客戶端和服務(wù)端),也可能同時發(fā)往兩個端。這也就意味著如果我們選擇同步,我們就可能在一端向另一端read()或者write(),同時另一端向這一端read()或者write()時阻塞,這也就意味著最終我們會變得無響應(yīng)。

根據(jù)下面幾條實(shí)現(xiàn)一個異步代理的簡單例子:

  • 在我們的方案中,我們在構(gòu)造函數(shù)中能拿到兩個連接。但不是所有的情況都這樣,比如對于一個web代理來說,客戶端只告訴我們服務(wù)端的地址。
  • 因?yàn)楸容^簡單,所以不是線程安全的。參考如下的代碼:

class proxy  : public boost::enable_shared_from_this<proxy> {
    proxy(ip::tcp::endpoint ep_client, ip::tcp::endpoint ep_server) : ... {}
public:
    static ptr start(ip::tcp::endpoint ep_client,
ip::tcp::endpoint ep_svr) {
        ptr new_(new proxy(ep_client, ep_svr));
        // … 連接到兩個端
        return new_;
    }
    void stop() {
        // ... 關(guān)閉兩個連接
    }
    bool started() { return started_ == 2; }
private:
    void on_connect(const error_code & err) {
        if ( !err)      {
            if ( ++started_ == 2) on_start();
        } else stop();
    }
    void on_start() {
        do_read(client_, buff_client_);
        do_read(server_, buff_server_);
    }
... 
private:
    ip::tcp::socket client_, server_;
    enum { max_msg = 1024 };
    char buff_client_[max_msg], buff_server_[max_msg]; 
    int started_; 
};

這是個非常簡單的代理。當(dāng)我們兩個端都連接時,它開始從兩個端讀?。?em>on_start()方法):

class proxy  : public boost::enable_shared_from_this<proxy> {
    ...
    void on_read(ip::tcp::socket & sock, const error_code& err, size_t bytes) {
        char * buff = &sock == &client_ ? buff_client_ : buff_server_;
        do_write(&sock == &client_ ? server_ : client_, buff, bytes);
    }
    void on_write(ip::tcp::socket & sock, const error_code &err, size_t bytes){
        if ( &sock == &client_) do_read(server_, buff_server_);
        else do_read(client_, buff_client_);
    }
    void do_read(ip::tcp::socket & sock, char* buff) {
        async_read(sock, buffer(buff, max_msg), MEM_FN3(read_complete,ref(sock),_1,_2), MEM_FN3(on_read,ref(sock),_1,_2));
    }
    void do_write(ip::tcp::socket & sock, char * buff, size_t size) {
        sock.async_write_some(buffer(buff,size), MEM_FN3(on_write,ref(sock),_1,_2));
    }
    size_t read_complete(ip::tcp::socket & sock, const error_code & err, size_t bytes) {
        if ( sock.available() > 0) return
        sock.available();
        return bytes > 0 ? 0 : 1;
    }
}; 

對每一個成功的讀取操作(on_read),它都會發(fā)送消息到另外一個部分。只要消息一發(fā)送成功(on_write),我們就從來源那部分再次讀取。

使用下面的代碼片段讓這個流程運(yùn)轉(zhuǎn)起來:

int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep_c(ip::address::from_string("127.0.0.1"),8001);
    ip::tcp::endpoint ep_s(ip::address::from_string("127.0.0.1"),8002);
    proxy::start(ep_c, ep_s);
    service.run();
} 

你會注意到我在讀和寫中重用了buffer。這個重用是ok的,因?yàn)閺目蛻舳俗x取到的消息在新消息被讀取之前就已經(jīng)寫入到服務(wù)端,反之亦然。這也意味著這種特別的實(shí)現(xiàn)方式會碰到響應(yīng)性的問題。當(dāng)我們正在處理到B部分的寫入時,我們不會從A讀?。ㄎ覀儠趯懭氲紹部分完成時重新從A部分讀?。?。你可以通過下面的方式重寫實(shí)現(xiàn)來克服這個問題:

  • 使用多個讀取buffer
  • 對每個成功的read操作,除了異步寫回到另外一個部分,還需要做額外的一個read(讀取到一個新的buffer)
  • 對每個成功的write操作,銷毀(或者重用)這個buffer

我會把這個當(dāng)作練習(xí)留給你們。

小結(jié)

在選擇同步或者異步時需要考慮很多事情。最先需要考慮的就是避免混淆它們。

在這一章中,我們已經(jīng)看到:

  • 實(shí)現(xiàn),測試,調(diào)試各個類型的應(yīng)用是多么簡單
  • 線程是如何影響你的應(yīng)用的
  • 應(yīng)用的行為是怎么影響它的實(shí)現(xiàn)的(拉取或者推送類型)
  • 選擇異步時怎樣去嵌入自己的異步操作

接下來,我們會了解一些Boost.Asio不那么為人知曉的特性,中間就有我最喜歡的Boost.Asio特性-協(xié)程,它可以讓你輕松地取異步之精華,去異步之糟粕。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號