[譯]Boost.Asio - 其他特性

2018-06-19 15:35 更新

Boost.Asio-其他特性

這章我們講了解一些Boost.Asio不那么為人所知的特性。標準的stream和streambuf對象有時候會更難用一些,但正如你所見,它們也有它們的益處。最后,你會看到姍姍來遲的Boost.Asio協程的入口,它可以讓你的異步代碼變得非常易讀。這是非常驚人的一個特性。

標準stream和標準I/O buffer

讀這一章節(jié)之前你需要對STL stream和STL streambuf對象有所了解。

Boost.Asio在處理I/O操作時支持兩種類型的buffer:

  • boost::asio::buffer():這種buffer關聯著一個Boost.Asio的操作(我們使用的buffer被傳遞給一個Boost.Asio的操作)
  • boost::asio::streambuf:這個buffer繼承自std::streambuf,在網絡編程中可以和STL stream一起使用

縱觀全書,之前的例子中最常見的例子如下:

size_t read_complete(boost::system::error_code, size_t bytes){ ... }
char buff[1024];
read(sock, buffer(buff), read_complete);
write(sock, buffer("echo\n"));

通常來說使用這個就能滿足你的需要,如果你想要更復雜,你可以使用streambuf來實現。

這個就是你可以用streambuf對象做的最簡單也是最壞的事情:

streambuf buf;
read(sock, buf);

這個會一直讀到streambuf對象滿了,然后因為streambuf對象可以通過自己重新開辟空間從而獲取更多的空間,它基本會讀到連接被關閉。

你可以使用read_until一直讀到一個特定的字符串:

streambuf buf;
read_until(sock, buf, "\n");

這個例子會一直讀到一個“\n”為止,把它添加到buffer的末尾,然后退出read方法。

向一個streambuf對象寫一些東西,你需要做一些類似下面的事情:

streambuf buf;
std::ostream out(&buf);
out << "echo" << std::endl;
write(sock, buf);

這是非常直觀的;你在構造函數中傳遞你的streambuf對象來構建一個STL stream,將其寫入到你想要發(fā)送的消息中,然后使用write來發(fā)送buffer的內容。

Boost.Asio和STL stream

Boost.Asio在集成STL stream和網絡方面做了很棒的工作。也就是說,如果你已經在使用STL擴展,你肯定就已經擁有了大量重載了操作符<<和>>的類。從socket讀或者寫入它們就好像在公園漫步一樣簡單。

假設你有下面的代碼片段:

struct person {
    std::string first_name, last_name;
    int age;
};
std::ostream& operator<<(std::ostream & out, const person & p) {
    return out << p.first_name << " " << p.last_name << " " << p.age;
}
std::istream& operator>>(std::istream & in, person & p) {
    return in >> p.first_name >> p.last_name >> p.age;
} 

通過網絡發(fā)送這個person就像下面的代碼片段這么簡單:

streambuf buf;
std::ostream out(&buf);
person p;
// … 初始化p
out << p << std::endl;
write(sock, buf);

另外一個部分也可以非常簡單的讀取:

read_until(sock, buf, "\n");
std::istream in(&buf);
person p;
in >> p;

使用streambuf對象(當然,也包括它用來寫入的std::ostream和用來讀取的std::istream)時最棒的部分就是你最終的編碼會很自然:

  • 當通過網絡寫入一些要發(fā)送的東西時,很有可能你會有多個片段的數據。所以,你需要把數據添加到一個buffer里面。如果那個數據不是一個字符串,你需要先把它轉換成一個字符串。當使用<<操作符時這些操作默認都已經做了。
  • 同樣,在另外一個部分,當讀取一個消息時,你需要解析它,也就是說,讀取到一個片段的數據時,如果這個數據不是字符串,你需要將它轉換為字符串。當你使用>>操作符讀取一些東西時這些也是默認就做了的。

最后要給出的是一個非常著名,非??岬脑E竅,使用下面的代碼片段把streambuf的內容輸出到console中

streambuf buf;
...
std::cout << &buf << std::endl; //把所有內容輸出到console中

同樣的,使用下面的代碼片段來把它的內容轉換為一個string

std::string to_string(streambuf &buf) {
    std::ostringstream out;
    out << &buf;
    return out.str();
} 

streambuf類

我之前說過,streambuf繼承自std::streambuf。就像std::streambuf本身,它不能拷貝構造。

另外,它有一些額外的方法,如下:

  • streambuf([max_size,][allocator]):這個方法構造了一個streambuf對象。你可以選擇指定一個最大的buffer大小和一個分配器,分配器用來在需要的時候分配/釋放內存。
  • prepare(n):這個方法返回一個子buffer,用來容納連續(xù)的n個字符。它可以用來讀取或者寫入。方法返回的結果可以在任何Boost.Asio處理read/write的自由函數中使用,而不僅僅是那些用來處理streambuf對象的方法。
  • data():這個方法以連續(xù)的字符串形式返回整個buffer然后用來寫入。方法返回的結果可以在任何Boost.Asio處理寫入的自由函數中使用,而不僅僅是那些用來處理streambuf對象的方法。
  • comsume(n):在這個方法中,數據從輸入隊列中被移除(從read操作)
  • commit(n):在這個方法中,數據從輸出隊列中被移除(從write操作)然后加入到輸入隊列中(為read操作準備)。
  • size():這個方法以字節(jié)為單位返回整個streambuf對象的大小。
  • max_size():這個方法返回最多能保存的字節(jié)數。

除了最后的兩個方法,其他的方法不是那么容易理解。首先,大部分時間你會把streambuf以參數的方式傳遞給read/write自由函數,就像下面的代碼片段展示的一樣:

read_until(sock, buf, "\n"); // 讀取到buf中
write(sock, buf); // 從buf寫入

如果你想之前的代碼片段展示的一樣把整個buffer都傳遞到一個自由函數中,方法會保證把buffer的輸入輸出指針指向的位置進行增加。也就是說,如果有數據需要讀,你就能讀到它。比如:

read_until(sock, buf, '\n');
std::cout << &buf << std::endl;

上述代碼會把你剛從socket寫入的東西輸出。而下面的代碼不會輸出任何東西:

read(sock, buf.prepare(16), transfer_exactly(16) );
std::cout << &buf << std::endl;

字節(jié)被讀取了,但是輸入指針沒有移動,你需要自己移動它,就像下面的代碼片段所展示的:

read(sock, buf.prepare(16), transfer_exactly(16) );
buf.commit(16);
std::cout << &buf << std::endl;

同樣的,假設你需要從streambuf對象中寫入,如果你使用了write自由函數,則需要像下面一樣:

streambuf buf;
std::ostream out(&buf);
out << "hi there" << std::endl;
write(sock, buf);

下面的代碼會把hi there發(fā)送三次:

streambuf buf;
std::ostream out(&buf);
out << "hi there" << std::endl;
for ( int i = 0; i < 3; ++i)
    write(sock, buf.data());

發(fā)生的原因是因為buffer從來沒有被消耗過,因為數據還在。如果你想消耗它,使用下面的代碼片段:

streambuf buf;
std::ostream out(&buf);
out << "hi there" << std::endl;
write(sock, buf.data());
buf.consume(9);

總的來說,你最好選擇一次處理整個streambuf實例。如果需要調整則使用上述的方法。

盡管你可以在讀和寫操作時使用同一個streambuf,你仍然建議你分開使用兩個,一個讀另外一個寫,它會讓事情變的簡單,清晰,同時你也會減少很多導致bug的可能。

處理streambuf對象的自由函數

下面列出了Boost.Asio中處理streambuf對象的自由函數:

  • read(sock, buf[, completion_function]):這個方法把內容從socket讀取到streambuf對象中。completion方法是可選的。如果有,它會在每次read操作成功之后被調用,然后告訴Boost.Asio這個操作是否完成(如果沒有,它繼續(xù)讀?。?。它的格式是:size_t completion(const boost::system::error_code & err, size_t bytes_transfered);,如果completion方法返回0,我們認為read操作完成了,如果非0,它表示下一次調用stream的read_some方法需要讀取的最大的字節(jié)數。
  • read_at(random_stream, offset, buf [, completion_function]): 這個方法從一個支持隨機讀取的stream中讀取。注意它沒有被應用到socket中(因為他們沒有隨機讀取的模型,它們是單向的,一直向前)。
  • read_until(sock, buf, char | string | regex | match_condition): 這個方法一直讀到滿足一個特性的條件為止?;蛘呤且粋€char類型的數據被讀到,或者是一個字符串被讀到,或者是一個目前讀到的字符串能匹配的正則表達式,或者match_condition方法告訴我們需要結束這個方法。match_condition方法的格式是:pair<iterator,bool> match(iterator begin, iterator end); ,iterator代表 buffers_ iterator<streambuf::const_buffers_type>。如果匹配到,你需要返回一個pairpassed_end_of_match被設置成true)。如果沒有匹配到,你需要返回pair(begin被設置為false)。
  • write(sock, buf [, completion_function]): 這個方法寫入streambuf對象所有的內容。completion方法是可選的,它的表現和read()completion方法類似:當write操作完成時返回0,或者返回一個非0數代表下一次調用stream的write_some方法需要寫入的最大的字節(jié)數。
  • write_at(random_stream,offset, buf [, completion_function]): 這個方法用來向一個支持隨機存儲的stream寫入。同樣,它沒有被應用到socket中。
  • async_read(sock, buf [, competion_function], handler): 這個方法是read()的異步實現,handler的格式為:void handler(const boost::system::error_code, size_t bytes)。
  • async_read_at(radom_stream, offset, buf [, completion_function] , handler): 這個方法是read_at()的異步實現。
  • async_readuntil (sock, buf, char | string | regex | match condition, handler): 這個方法是read_until()的異步實現。
  • async_write(sock, buf [, completion_function] , handler): 這個方法是write()的異步實現。
  • async_write_at(random_stream,offset, buf [, completion_function] , handler): 這個方法是write_at()的異步實現。

我們假設你需要一直讀取直到讀到一個元音字母:

streambuf buf;
bool is_vowel(char c) {
    return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u';
}
size_t read_complete(boost::system::error_code, size_t bytes) {
    const char * begin = buffer_cast<const char*>( buf.data());
    if ( bytes == 0) return 1;
    while ( bytes > 0)
        if ( is_vowel(*begin++)) return 0;
        else --bytes;
    return 1;
}
...
read(sock, buf, read_complete);

這里需要注意的事情是對read_complete()中buffer的訪問,也就是buffer_cast<>buf.data。

如果你使用正則,上面的例子會更簡單:

read_until(sock, buf, boost::regex("^[aeiou]+") ); 

或者我們修改例子來讓match_condition方法工作起來:

streambuf buf;
bool is_vowel(char c) {
    return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u';
}
typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator,bool> match_vowel(iterator b, iterator e) {
    while ( b != e)
        if ( is_vowel(*b++)) return std::make_pair(b, true);
    return std::make_pair(e, false);
}
...
size_t bytes = read_until(sock, buf, match_vowel);

當使用read_until時會有個難點:你需要記住你已經讀取的字節(jié)數,因為下層的buffer可能多讀取了一些字節(jié)(不像使用read()時)。比如:

std::cout << &buf << std::endl;

上述代碼輸出的字節(jié)可能比read_until讀取到的多。

協程

Boost.Asio的作者在2009-2010年間實現了非??岬囊粋€部分,協程,它能讓你更簡單地設計你的異步應用。

它們可以讓你同時享受同步和異步兩個世界中最好的部分,也就是:異步編程但是很簡單就能遵循流程控制,就好像應用是按流程實現的。

正常的流程已經在情形1種展示了,如果使用協程,你會盡可能的接近情形2。

簡單來說,就是協程允許在方法中的指定位置開辟一個入口來暫停和恢復運行。

如果要使用協程,你需要在boost/libs/asio/example/http/server4目錄下的兩個頭文件:yield.hppcoroutine.hpp。在這里,Boost.Asio定義了兩個虛擬的關鍵詞(宏)和一個類:

  • coroutine:這個類在實現協程時被你的連接類繼承或者使用。
  • reenter(entry):這個是協程的主體。參數entry是一個指向coroutine實例的指針,它被當作一個代碼塊在整個方法中使用。
  • yield code:它把一個聲明當作協程的一部分來運行。當下一次進入方法時,操作會在這段代碼之后執(zhí)行。

為了更好的理解,我們來看一個例子。我們會重新實現 第四章 異步客戶端 中的應用,這是一個可以登錄,ping,然后能告訴你其他已登錄客戶端的簡單客戶端應用。 核心代碼和下面的代碼片段類似:

class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, public coroutine, boost::noncopyable {
    ...
    void step(const error_code & err = error_code(), size_t bytes = 0) {
        reenter(this) 
        { 
            for (;;) {
                yield async_write(sock_, write_buffer_, MEM_FN2(step,_1,_2) );
                yield async_read_until( sock_, read_buffer_,"\n", MEM_FN2(step,_1,_2));
                yield service.post( MEM_FN(on_answer_from_server));
            }
        } 
    }
}; 

首先改變的事就是:我們只有一個叫做step()的方法,而沒有大量類似connect(),on_connect(),on_read(),do_read(),on_write(),do_write()等等的成員方法。

方法的主體在reenter(this) { for (;;) { }} 內。你可以把reenter(this)當作我們上次運行的代碼,所以這次我們執(zhí)行的是下一次的代碼。

reenter代碼塊中,你會發(fā)現幾個yield聲明。你第一次進入方法時,async_write方法被執(zhí)行,第二次async_read_until方法被執(zhí)行,第三次service.post方法被執(zhí)行,然后第四次async_write方法被執(zhí)行,然后一直循環(huán)下去。

你需要一直記住for(;;){}實例。參考下面的代碼片段:

void step(const error_code & err = error_code(), size_t bytes = 0) {
    reenter(this) {
        yield async_write(sock_, write_buffer_, MEM_FN2(step,_1,_2) );
        yield async_read_until( sock_, read_buffer_, "\n",MEM_FN2(step,_1,_2));
        yield service.post(MEM_FN(on_answer_from_server));
    }
} 

如果我們第三次使用上述的代碼片段,我們會進入方法然后執(zhí)行service.post。當我們第四次進入方法時,我們跳過service.post,不執(zhí)行任何東西。當執(zhí)行第五次時仍然不執(zhí)行任何東西,然后一直這樣下去:

class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, public coroutine, boost::noncopyable {
    talk_to_svr(const std::string & username) : ... {}
    void start(ip::tcp::endpoint ep) {
        sock_.async_connect(ep, MEM_FN2(step,_1,0) );
    }
    static ptr start(ip::tcp::endpoint ep, const std::string &username) {
        ptr new_(new talk_to_svr(username));
        new_->start(ep); 
        return new_;
    }
    void step(const error_code & err = error_code(), size_t bytes = 0)
    {
        reenter(this) { 
            for (;;) {
                if ( !started_) {
                    started_ = true;
                    std::ostream out(&write_buf_);
                    out << "login " << username_ << "\n";
                }
                yield async_write(sock_, write_buf_,MEM_FN2(step,_1,_2));
                yield async_read_until( sock_,read_buf_,"\n",MEM_FN2(step,_1,_2));
                yield service.post(MEM_FN(on_answer_from_server));
            }
        }
    }
    void on_answer_from_server() {
        std::istream in(&read_buf_);
        std::string word;
        in >> word;
        if ( word == "login") on_login();
        else if ( word == "ping") on_ping();
        else if ( word == "clients") on_clients();
        read_buf_.consume( read_buf_.size());
        if (write_buf_.size() > 0) service.post(MEM_FN2(step,error_code(),0));
    }
    ... 
private:
    ip::tcp::socket sock_;
    streambuf read_buf_, write_buf_;
    bool started_;
    std::string username_;
    deadline_timer timer_;
};

當我們啟動連接時,start()被調用,然后它會異步地連接到服務端。當連接完成時,我們第一次進入step()。也就是我們發(fā)送我們登錄信息的時候。

在那之后,我們調用async_write,然后調用async_read_until,再處理消息(on_answer_from_server)。

我們在on_answer_from_server處理接收到的消息;我們讀取第一個字符,然后把它分發(fā)到相應的方法。剩下的消息(如果還有一些消息沒讀完)我們都忽略掉:

class talk_to_svr : ... {
    ...
    void on_login() { do_ask_clients(); }
    void on_ping() {
        std::istream in(&read_buf_);
        std::string answer; in >> answer;
        if ( answer == "client_list_changed")
            do_ask_clients();
        else postpone_ping();
    }
    void on_clients() {
        std::ostringstream clients; clients << &read_buf_;
        std::cout << username_ << ", new client list:" << clients.str();
        postpone_ping();
    }
    void do_ping() {
        std::ostream out(&write_buf_); out << "ping\n";
        service.post( MEM_FN2(step,error_code(),0));
    } 
    void postpone_ping() {
        timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000));
        timer_.async_wait( MEM_FN(do_ping));
    }
    void do_ask_clients() {
        std::ostream out(&write_buf_);
        out << "ask_clients\n";
    }
}; 

完整的例子還會更復雜一點,因為我們需要隨機地ping服務端。實現這個功能我們需要在第一次請求客戶端列表完成之后做一個ping操作。然后,在每個從服務端返回的ping操作的結果中,我們做另外一個ping操作。

使用下面的代碼片段來執(zhí)行整個過程:

int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"),8001);
    talk_to_svr::start(ep, "John");
    service.run();
} 

使用協程,我們節(jié)約了15行代碼,而且代碼也變的更加易讀。

在這里我們僅僅接觸了協程的一點皮毛。如果你想要了解更多,請登錄作者的個人主頁:http://blog.think-async.com/2010_03_01_archive.html

總結

我們已經了解了如何使用Boost.Asio玩轉STL stream和streambuf對象。我們也了解了如何使用協程來讓我們的代碼更加簡潔和易讀。

下面就是重頭戲了,比如Asio VS Boost.Asio,高級調試,SSL和平臺相關特性。

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號