這一章涵蓋了使用Boost.Asio時(shí)必須知道的一些事情。我們也將深入研究比同步編程更復(fù)雜、更有樂趣的異步編程。
這一部分包含了當(dāng)使用Boost.Asio編寫網(wǎng)絡(luò)應(yīng)用程序時(shí)必須知道的事情。
Boost.Asio的所有內(nèi)容都包含在boost::asio命名空間或者其子命名空間內(nèi)。
對(duì)于IP地址的處理,Boost.Asio提供了ip::address , ip::address_v4和ip::address_v6類。 它們提供了相當(dāng)多的函數(shù)。下面列出了最重要的幾個(gè):
大多數(shù)情況你會(huì)選擇用ip::address::from_string:
ip::address addr = ip::address::from_string("127.0.0.1");
如果你想通過一個(gè)主機(jī)名進(jìn)行連接,下面的代碼片段是無用的:
// 拋出異常
ip::address addr = ip::address::from_string("www.yahoo.com");
端點(diǎn)是使用某個(gè)端口連接到的一個(gè)地址。不同類型的socket有它自己的endpoint類,比如ip::tcp::endpoint、ip::udp::endpoint和ip::icmp::endpoint
如果想連接到本機(jī)的80端口,你可以這樣做:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
有三種方式來讓你建立一個(gè)端點(diǎn):
例子如下:
ip::tcp::endpoint ep1;
ip::tcp::endpoint ep2(ip::tcp::v4(), 80);
ip::tcp::endpoint ep3( ip::address::from_string("127.0.0.1), 80);
如果你想連接到一個(gè)主機(jī)(不是IP地址),你需要這樣做:
// 輸出 "87.248.122.122"
io_service service;
ip::tcp::resolver resolver(service);
ip::tcp::resolver::query query("www.yahoo.com", "80");
ip::tcp::resolver::iterator iter = resolver.resolve( query);
ip::tcp::endpoint ep = *iter;
std::cout << ep.address().to_string() << std::endl;
你可以用你需要的socket類型來替換tcp。首先,為你想要查詢的名字創(chuàng)建一個(gè)查詢器,然后用resolve()函數(shù)解析它。如果成功,它至少會(huì)返回一個(gè)入口。你可以利用返回的迭代器,使用第一個(gè)入口或者遍歷整個(gè)列表來拿到全部的入口。
給定一個(gè)端點(diǎn),可以獲得他的地址,端口和IP協(xié)議(v4或者v6):
std::cout << ep.address().to_string() << ":" << ep.port()
<< "/" << ep.protocol() << std::endl;
Boost.Asio有三種類型的套接字類:ip::tcp, ip::udp和ip::icmp。當(dāng)然它也是可擴(kuò)展的,你可以創(chuàng)建自己的socket類,盡管這相當(dāng)復(fù)雜。如果你選擇這樣做,參照一下boost/asio/ip/tcp.hpp, boost/asio/ip/udp.hpp和boost/asio/ip/icmp.hpp。它們都是含有內(nèi)部typedef關(guān)鍵字的超小類。
你可以把ip::tcp, ip::udp, ip::icmp類當(dāng)作占位符;它們可以讓你便捷地訪問其他類/函數(shù),如下所示:
socket類創(chuàng)建一個(gè)相應(yīng)的socket。而且總是在構(gòu)造的時(shí)候傳入io_service實(shí)例:
io_service service;
ip::udp::socket sock(service)
sock.set_option(ip::udp::socket::reuse_address(true));
每一個(gè)socket的名字都是一個(gè)typedef關(guān)鍵字
所有的同步函數(shù)都有拋出異?;蛘叻祷劐e(cuò)誤碼的重載,比如下面的代碼片段:
sync_func( arg1, arg2 ... argN); // 拋出異常
boost::system::error_code ec;
sync_func( arg1 arg2, ..., argN, ec); // 返回錯(cuò)誤碼
在這一章剩下的部分,你會(huì)見到大量的同步函數(shù)。簡(jiǎn)單起見,我省略了有返回錯(cuò)誤碼的重載,但是不可否認(rèn)它們確實(shí)是存在的。
這些方法被分成了幾組。并不是所有的方法都可以在各個(gè)類型的套接字里使用。這個(gè)部分的結(jié)尾將有一個(gè)列表來展示各個(gè)方法分別屬于哪個(gè)socket類。
注意所有的異步方法都立刻返回,而它們相對(duì)的同步實(shí)現(xiàn)需要操作完成之后才能返回。
這些方法是用來連接或綁定socket、斷開socket字連接以及查詢連接是活動(dòng)還是非活動(dòng)的:
例子如下:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.open(ip::tcp::v4()); n
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
char buff[1024]; sock.read_some(buffer(buff,1024));
sock.shutdown(ip::tcp::socket::shutdown_receive);
sock.close();
這些是在套接字上執(zhí)行I/O操作的函數(shù)。
對(duì)于異步函數(shù)來說,處理程序的格式void handler(const boost::system::error_code& e, size_t bytes)都是一樣的
稍后我們將討論緩沖區(qū)。讓我們先來了解一下標(biāo)記。標(biāo)記的默認(rèn)值是0,但是也可以是以下幾種:
你最常用的可能是message_peek,使用方法請(qǐng)參照下面的代碼片段:
char buff[1024];
sock.receive(buffer(buff), ip::tcp::socket::message_peek );
memset(buff,1024, 0);
// 重新讀取之前已經(jīng)讀取過的內(nèi)容
sock.receive(buffer(buff) );
下面的是一些教你如何同步或異步地從不同類型的套接字上讀取數(shù)據(jù)的例子:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
std::cout << "bytes available " << sock.available() << std::endl;
char buff[512];
size_t read = sock.read_some(buffer(buff));
ip::udp::socket sock(service);
sock.open(ip::udp::v4());
ip::udp::endpoint receiver_ep("87.248.112.181", 80);
sock.send_to(buffer("testing\n"), receiver_ep);
char buff[512];
ip::udp::endpoint sender_ep;
sock.receive_from(buffer(buff), sender_ep);
[?注意:就像上述代碼片段所展示的那樣,使用receive_from從一個(gè)UDP套接字讀取數(shù)據(jù)時(shí),你需要構(gòu)造一個(gè)默認(rèn)的端點(diǎn)]
using namespace boost::asio;
io_service service;
ip::udp::socket sock(service);
boost::asio::ip::udp::endpoint sender_ep;
char buff[512];
void on_read(const boost::system::error_code & err, std::size_t read_bytes) {
std::cout << "read " << read_bytes << std::endl;
sock.async_receive_from(buffer(buff), sender_ep, on_read);
}
int main(int argc, char* argv[]) {
ip::udp::endpoint ep(ip::address::from_string("127.0.0.1"),
8001);
sock.open(ep.protocol());
sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
sock.bind(ep);
sock.async_receive_from(buffer(buff,512), sender_ep, on_read);
service.run();
}
這些函數(shù)用來處理套接字的高級(jí)選項(xiàng):
這些是你可以獲取/設(shè)置的套接字選項(xiàng):
名字 | 描述 | 類型 |
---|---|---|
broadcast | 如果為true,允許廣播消息 | bool |
debug | 如果為true,啟用套接字級(jí)別的調(diào)試 | bool |
do_not_route | 如果為true,則阻止路由選擇只使用本地接口 | bool |
enable_connection_aborted | 如果為true,記錄在accept()時(shí)中斷的連接 | bool |
keep_alive | 如果為true,會(huì)發(fā)送心跳 | bool |
linger | 如果為true,套接字會(huì)在有未發(fā)送數(shù)據(jù)的情況下掛起close() | bool |
receive_buffer_size | 套接字接收緩沖區(qū)大小 | int |
receive_low_watemark | 規(guī)定套接字輸入處理的最小字節(jié)數(shù) | int |
reuse_address | 如果為true,套接字能綁定到一個(gè)已用的地址 | bool |
send_buffer_size | 套接字發(fā)送緩沖區(qū)大小 | int |
send_low_watermark | 規(guī)定套接字?jǐn)?shù)據(jù)發(fā)送的最小字節(jié)數(shù) | int |
ip::v6_only | 如果為true,則只允許IPv6的連接 | bool |
每個(gè)名字代表了一個(gè)內(nèi)部套接字typedef或者類。下面是對(duì)它們的使用:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
// TCP套接字可以重用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);
// 獲取套接字讀取的數(shù)據(jù)
ip::tcp::socket::receive_buffer_size rbs;
sock.get_option(rbs);
std::cout << rbs.value() << std::endl;
// 把套接字的緩沖區(qū)大小設(shè)置為8192
ip::tcp::socket::send_buffer_size sbs(8192);
sock.set_option(sbs);
[?在上述特性工作之前,套接字要被打開。否則,會(huì)拋出異常]
就像我之前所說,不是所有的成員方法在所有的套接字類中都可用。我做了一個(gè)包含成員函數(shù)不同點(diǎn)的列表。如果一個(gè)成員函數(shù)沒有出現(xiàn)在這,說明它在所有的套接字類都是可用的。
名字 | TCP | UDP | ICMP |
---|---|---|---|
async_read_some | 是 | - | - |
async_receive_from | - | 是 | 是 |
async_write_some | 是 | - | - |
async_send_to | - | 是 | 是 |
read_some | 是 | - | - |
receive_from | - | 是 | 是 |
write_some | 是 | - | - |
send_to | - | 是 | 是 |
其他與連接和I/O無關(guān)的函數(shù)如下:
最后要注意的一點(diǎn),套接字實(shí)例不能被拷貝,因?yàn)榭截悩?gòu)造方法和=操作符是不可訪問的。
ip::tcp::socket s1(service), s2(service);
s1 = s2; // 編譯時(shí)報(bào)錯(cuò)
ip::tcp::socket s3(s1); // 編譯時(shí)報(bào)錯(cuò)
這是非常有意義的,因?yàn)槊恳粋€(gè)實(shí)例都擁有并管理著一個(gè)資源(原生套接字本身)。如果我們?cè)试S拷貝構(gòu)造,結(jié)果是我們會(huì)有兩個(gè)實(shí)例擁有同樣的原生套接字;這樣我們就需要去處理所有者的問題(讓一個(gè)實(shí)例擁有所有權(quán)?或者使用引用計(jì)數(shù)?還是其他的方法)Boost.Asio選擇不允許拷貝(如果你想要?jiǎng)?chuàng)建一個(gè)備份,請(qǐng)使用共享指針)
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
socket_ptr sock1(new ip::tcp::socket(service));
socket_ptr sock2(sock1); // ok
socket_ptr sock3;
sock3 = sock1; // ok
當(dāng)從一個(gè)套接字讀寫內(nèi)容時(shí),你需要一個(gè)緩沖區(qū),用來保存讀取和寫入的數(shù)據(jù)。緩沖區(qū)內(nèi)存的有效時(shí)間必須比I/O操作的時(shí)間要長(zhǎng);你需要保證它們?cè)贗/O操作結(jié)束之前不被釋放。 對(duì)于同步操作來說,這很容易;當(dāng)然,這個(gè)緩沖區(qū)在receive和send時(shí)都存在。
char buff[512];
...
sock.receive(buffer(buff));
strcpy(buff, "ok\n");
sock.send(buffer(buff));
但是在異步操作時(shí)就沒這么簡(jiǎn)單了,看下面的代碼片段:
// 非常差勁的代碼 ...
void on_read(const boost::system::error_code & err, std::size_t read_bytes)
{ ... }
void func() {
char buff[512];
sock.async_receive(buffer(buff), on_read);
}
在我們調(diào)用async_receive()之后,buff就已經(jīng)超出有效范圍,它的內(nèi)存當(dāng)然會(huì)被釋放。當(dāng)我們開始從套接字接收一些數(shù)據(jù)時(shí),我們會(huì)把它們拷貝到一片已經(jīng)不屬于我們的內(nèi)存中;它可能會(huì)被釋放,或者被其他代碼重新開辟來存入其他的數(shù)據(jù),結(jié)果就是:內(nèi)存沖突。
對(duì)于上面的問題有幾個(gè)解決方案:
第一個(gè)方法顯然不是很好,因?yàn)槲覀兌贾廊肿兞糠浅2缓?。此外,如果兩個(gè)實(shí)例使用同一個(gè)緩沖區(qū)怎么辦?
下面是第二種方式的實(shí)現(xiàn):
void on_read(char * ptr, const boost::system::error_code & err, std::size_t read_bytes) {
delete[] ptr;
}
....
char * buff = new char[512];
sock.async_receive(buffer(buff, 512), boost::bind(on_read,buff,_1,_2))
或者,如果你想要緩沖區(qū)在操作結(jié)束后自動(dòng)超出范圍,使用共享指針
struct shared_buffer {
boost::shared_array<char> buff;
int size;
shared_buffer(size_t size) : buff(new char[size]), size(size) {
}
mutable_buffers_1 asio_buff() const {
return buffer(buff.get(), size);
}
};
// 當(dāng)on_read超出范圍時(shí), boost::bind對(duì)象被釋放了,
// 同時(shí)也會(huì)釋放共享指針
void on_read(shared_buffer, const boost::system::error_code & err, std::size_t read_bytes) {}
sock.async_receive(buff.asio_buff(), boost::bind(on_read,buff,_1,_2));
shared_buffer類擁有實(shí)質(zhì)的shared_array<>,shared_array<>存在的目的是用來保存shared_buffer實(shí)例的拷貝-當(dāng)最后一個(gè)share_array<>元素超出范圍時(shí),shared_array<>就被自動(dòng)銷毀了,而這就是我們想要的結(jié)果。
因?yàn)锽oost.Asio會(huì)給完成處理句柄保留一個(gè)拷貝,當(dāng)操作完成時(shí)就會(huì)調(diào)用這個(gè)完成處理句柄,所以你的目的達(dá)到了。那個(gè)拷貝是一個(gè)boost::bind的仿函數(shù),它擁有著實(shí)際的shared_buffer實(shí)例。這是非常優(yōu)雅的!
第三個(gè)選擇是使用一個(gè)連接對(duì)象來管理套接字和其他數(shù)據(jù),比如緩沖區(qū),通常來說這是正確的解決方案但是非常復(fù)雜。在這一章的末尾我們會(huì)對(duì)這種方法進(jìn)行討論。
縱觀所有代碼,你會(huì)發(fā)現(xiàn):無論什么時(shí)候,當(dāng)我們需要對(duì)一個(gè)buffer進(jìn)行讀寫操作時(shí),代碼會(huì)把實(shí)際的緩沖區(qū)對(duì)象封裝在一個(gè)buffer()方法中,然后再把它傳遞給方法調(diào)用:
char buff[512];
sock.async_receive(buffer(buff), on_read);
基本上我們都會(huì)把緩沖區(qū)包含在一個(gè)類中以便Boost.Asio的方法能遍歷這個(gè)緩沖區(qū),比方說,使用下面的代碼:
sock.async_receive(some_buffer, on_read);
實(shí)例some_buffer需要滿足一些需求,叫做ConstBufferSequence或者MutableBufferSequence(你可以在Boost.Asio的文檔中查看它們)。創(chuàng)建你自己的類去處理這些需求的細(xì)節(jié)是非常復(fù)雜的,但是Boost.Asio已經(jīng)提供了一些類用來處理這些需求。所以你不用直接訪問這些緩沖區(qū),而可以使用buffer()方法。
自信地講,你可以把下面列出來的類型都包裝到一個(gè)buffer()方法中:
下面的代碼都是有效的:
struct pod_sample { int i; long l; char c; };
...
char b1[512];
void * b2 = new char[512];
std::string b3; b3.resize(128);
pod_sample b4[16];
std::vector<pod_sample> b5; b5.resize(16);
boost::array<pod_sample,16> b6;
std::array<pod_sample,16> b7;
sock.async_send(buffer(b1), on_read);
sock.async_send(buffer(b2,512), on_read);
sock.async_send(buffer(b3), on_read);
sock.async_send(buffer(b4), on_read);
sock.async_send(buffer(b5), on_read);
sock.async_send(buffer(b6), on_read);
sock.async_send(buffer(b7), on_read);
總的來說就是:與其創(chuàng)建你自己的類來處理ConstBufferSequence或者MutableBufferSequence的需求,不如創(chuàng)建一個(gè)能在你需要的時(shí)候保留緩沖區(qū),然后返回一個(gè)mutable_buffers_1實(shí)例的類,而我們?cè)缭趕hared_buffer類中就這樣做了。
Boost.Asio提供了處理I/O的自由函數(shù),我們分四組來分析它們。
這些方法把套接字連接到一個(gè)端點(diǎn)。
它的例子如下:
using namespace boost::asio::ip;
tcp::resolver resolver(service);
tcp::resolver::iterator iter = resolver.resolve(tcp::resolver::query("www.yahoo.com","80"));
tcp::socket sock(service);
connect(sock, iter);
一個(gè)主機(jī)名可以被解析成多個(gè)地址,而connect和async_connect能很好地把你從嘗試每個(gè)地址然后找到一個(gè)可用地址的繁重工作中解放出來,因?yàn)樗鼈円呀?jīng)幫你做了這些。
這些方法對(duì)一個(gè)流進(jìn)行讀寫操作(可以是套接字,或者其他表現(xiàn)得像流的類):
async_read(stream, stream_buffer [, completion], handler)
async_write(strean, stream_buffer [, completion], handler)
write(stream, stream_buffer [, completion])
read(stream, stream_buffer [, completion])
首先,要注意第一個(gè)參數(shù)變成了流,而不單是socket。這個(gè)參數(shù)包含了socket但不僅僅是socket。比如,你可以用一個(gè)Windows的文件句柄來替代socket。 當(dāng)下面情況出現(xiàn)時(shí),所有read和write操作都會(huì)結(jié)束:
下面的代碼會(huì)異步地從一個(gè)socket中間讀取數(shù)據(jù)直到讀取到’\n’:
io_service service;
ip::tcp::socket sock(service);
char buff[512];
int offset = 0;
size_t up_to_enter(const boost::system::error_code &, size_t bytes) {
for ( size_t i = 0; i < bytes; ++i)
if ( buff[i + offset] == '\n')
return 0;
return 1;
}
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), up_to_enter, on_read);
Boost.Asio也提供了一些簡(jiǎn)單的完成處理仿函數(shù):
例子如下:
char buff[512];
void on_read(const boost::system::error_code &, size_t) {}
// 讀取32個(gè)字節(jié)
async_read(sock, buffer(buff), transfer_exactly(32), on_read);
上述的4個(gè)方法,不使用普通的緩沖區(qū),而使用由Boost.Asio的std::streambuf類繼承來的stream_buffer方法。stl流和流緩沖區(qū)非常復(fù)雜;下面是例子:
io_service service;
void on_read(streambuf& buf, const boost::system::error_code &, size_t) {
std::istream in(&buf);
std::string line;
std::getline(in, line);
std::cout << "first line: " << line << std::endl;
}
int main(int argc, char* argv[]) {
HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
windows::stream_handle h(service, file);
streambuf buf;
async_read(h, buf, transfer_exactly(256), boost::bind(on_read,boost::ref(buf),_1,_2));
service.run();
}
在這里,我向你們展示了如何在一個(gè)Windows文件句柄上調(diào)用async_read。讀取前256個(gè)字符,然后把它們保存到緩沖區(qū)中,當(dāng)操作結(jié)束時(shí)。on_read被調(diào)用,再創(chuàng)建std::istream用來傳遞緩沖區(qū),讀取第一行(std::getline),最后把它輸出到命令行中。
這些方法在條件滿足之前會(huì)一直讀?。?
下面這個(gè)例子在讀到一個(gè)指定的標(biāo)點(diǎn)符號(hào)之前會(huì)一直讀?。?/p>
typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator, bool> match_punct(iterator begin, iterator end) {
while ( begin != end)
if ( std::ispunct(*begin))
return std::make_pair(begin,true);
return std::make_pair(end,false);
}
void on_read(const boost::system::error_code &, size_t) {}
...
streambuf buf;
async_read_until(sock, buf, match_punct, on_read);
如果我們想讀到一個(gè)空格時(shí)就結(jié)束,我們需要把最后一行修改為:
async_read_until(sock, buff, ' ', on_read);
這些方法用來在一個(gè)流上面做隨機(jī)存取操作。由你來指定read和write操作從什么地方開始(offset):
這些方法不支持套接字。它們用來處理流的隨機(jī)訪問;也就是說,流是可以隨機(jī)訪問的。套接字顯然不是這樣(套接字是不可回溯的)。
下面這個(gè)例子告訴你怎么從一個(gè)文件偏移為256的位置讀取128個(gè)字節(jié):
io_service service;
int main(int argc, char* argv[]) {
HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
windows::random_access_handle h(service, file);
streambuf buf;
read_at(h, 256, buf, transfer_exactly(128));
std::istream in(&buf);
std::string line;
std::getline(in, line);
std::cout << "first line: " << line << std::endl;
}
這部分對(duì)異步編程時(shí)可能碰到的一些問題進(jìn)行了深入的探究。我建議你先讀一遍,然后在接下來讀這本書的過程中,再經(jīng)?;剡^頭來看看,從而增強(qiáng)你對(duì)這部分的理解。
就像我之前所說的,同步編程比異步編程簡(jiǎn)單很多。這是因?yàn)?,線性的思考是很簡(jiǎn)單的(調(diào)用A,調(diào)用A結(jié)束,調(diào)用B,調(diào)用B結(jié)束,然后繼續(xù),這是以事件處理的方式來思考)。后面你會(huì)碰到這種情況,比如:五件事情,你不知道它們執(zhí)行的順序,也不知道他們是否會(huì)執(zhí)行!
盡管異步編程更難,但是你會(huì)更傾向于選擇使用它,比如:寫一個(gè)需要處理很多并發(fā)訪問的服務(wù)端。并發(fā)訪問越多,異步編程就比同步編程越簡(jiǎn)單。
假設(shè):你有一個(gè)需要處理1000個(gè)并發(fā)訪問的應(yīng)用,從客戶端發(fā)給服務(wù)端的每個(gè)信息都會(huì)再返回給客戶端,以‘\n’結(jié)尾。
同步方式的代碼,1個(gè)線程:
using namespace boost::asio;
struct client {
ip::tcp::socket sock;
char buff[1024]; // 每個(gè)信息最多這么大
int already_read; // 你已經(jīng)讀了多少
};
std::vector<client> clients;
void handle_clients() {
while ( true)
for ( int i = 0; i < clients.size(); ++i)
if ( clients[i].sock.available() ) on_read(clients[i]);
}
void on_read(client & c) {
int to_read = std::min( 1024 - c.already_read, c.sock.available());
c.sock.read_some( buffer(c.buff + c.already_read, to_read));
c.already_read += to_read;
if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) {
int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff;
std::string msg(c.buff, c.buff + pos);
std::copy(c.buff + pos, c.buff + 1024, c.buff);
c.already_read -= pos;
on_read_msg(c, msg);
}
}
void on_read_msg(client & c, const std::string & msg) {
// 分析消息,然后返回
if ( msg == "request_login")
c.sock.write( "request_ok\n");
else if ...
}
有一種情況是在任何服務(wù)端(和任何基于網(wǎng)絡(luò)的應(yīng)用)都需要避免的,就是代碼無響應(yīng)的情況。在我們的例子里,我們需要handle_clients()方法盡可能少的阻塞。如果方法在某個(gè)點(diǎn)上阻塞,任何進(jìn)來的信息都需要等待方法解除阻塞才能被處理。
為了保持響應(yīng),只在一個(gè)套接字有數(shù)據(jù)的時(shí)候我們才讀,也就是說,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read時(shí),我們只讀當(dāng)前可用的;調(diào)用read_until(c.sock, buffer(...), '\n')會(huì)是一個(gè)非常糟糕的選擇,因?yàn)橹钡轿覀儚囊粋€(gè)指定的客戶端讀取了完整的消息之前,它都是阻塞的(我們永遠(yuǎn)不知道它什么時(shí)候會(huì)讀取到完整的消息)
這里的瓶頸就是on_read_msg()方法;當(dāng)它執(zhí)行時(shí),所有進(jìn)來的消息都在等待。一個(gè)良好的on_read_msg()方法實(shí)現(xiàn)會(huì)保證這種情況基本不會(huì)發(fā)生,但是它還是會(huì)發(fā)生(有時(shí)候向一個(gè)套接字寫入數(shù)據(jù),緩沖區(qū)滿了時(shí),它會(huì)被阻塞) 同步方式的代碼,10個(gè)線程
using namespace boost::asio;
struct client {
// ... 和之前一樣
bool set_reading() {
boost::mutex::scoped_lock lk(cs_);
if ( is_reading_) return false; // 已經(jīng)在讀取
else { is_reading_ = true; return true; }
}
void unset_reading() {
boost::mutex::scoped_lock lk(cs_);
is_reading_ = false;
}
private:
boost::mutex cs_;
bool is_reading_;
};
std::vector<client> clients;
void handle_clients() {
for ( int i = 0; i < 10; ++i)
boost::thread( handle_clients_thread);
}
void handle_clients_thread() {
while ( true)
for ( int i = 0; i < clients.size(); ++i)
if ( clients[i].sock.available() )
if ( clients[i].set_reading()) {
on_read(clients[i]);
clients[i].unset_reading();
}
}
void on_read(client & c) {
// 和之前一樣
}
void on_read_msg(client & c, const std::string & msg) {
// 和之前一樣
}
為了使用多線程,我們需要對(duì)線程進(jìn)行同步,這就是set_reading()和set_unreading()所做的。set_reading()方法非常重要,比如你想要一步實(shí)現(xiàn)“判斷是否在讀取然后標(biāo)記為讀取中”。但這是有兩步的(“判斷是否在讀取”和“標(biāo)記為讀取中”),你可能會(huì)有兩個(gè)線程同時(shí)為一個(gè)客戶端判斷是否在讀取,然后你會(huì)有兩個(gè)線程同時(shí)為一個(gè)客戶端調(diào)用on_read,結(jié)果就是數(shù)據(jù)沖突甚至導(dǎo)致應(yīng)用崩潰。
你會(huì)發(fā)現(xiàn)代碼變得極其復(fù)雜。
同步編程有第三個(gè)選擇,就是為每個(gè)連接開辟一個(gè)線程。但是當(dāng)并發(fā)的線程增加時(shí),這就成了一種災(zāi)難性的情況。
然后,讓我們來看異步編程。我們不斷地異步讀取。當(dāng)一個(gè)客戶端請(qǐng)求某些東西時(shí),on_read被調(diào)用,然后回應(yīng),然后等待下一個(gè)請(qǐng)求(然后開始另外一個(gè)異步的read操作)。
異步方式的代碼,10個(gè)線程
using namespace boost::asio;
io_service service;
struct client {
ip::tcp::socket sock;
streambuf buff; // 從客戶端取回結(jié)果
}
std::vector<client> clients;
void handle_clients() {
for ( int i = 0; i < clients.size(); ++i)
async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2));
for ( int i = 0; i < 10; ++i)
boost::thread(handle_clients_thread);
}
void handle_clients_thread() {
service.run();
}
void on_read(client & c, const error_code & err, size_t read_bytes) {
std::istream in(&c.buff);
std::string msg;
std::getline(in, msg);
if ( msg == "request_login")
c.sock.async_write( "request_ok\n", on_write);
else if ...
...
// 等待同一個(gè)客戶端下一個(gè)讀取操作
async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2));
}
發(fā)現(xiàn)代碼變得有多簡(jiǎn)單了吧?client結(jié)構(gòu)里面只有兩個(gè)成員,handle_clients()僅僅調(diào)用了async_read_until,然后它創(chuàng)建了10個(gè)線程,每個(gè)線程都調(diào)用service.run()。這些線程會(huì)處理所有來自客戶端的異步read操作,然后分發(fā)所有向客戶端的異步write操作。另外需要注意的一件事情是:on_read()一直在為下一次異步read操作做準(zhǔn)備(看最后一行代碼)。
為了實(shí)現(xiàn)監(jiān)聽循環(huán),io_service類提供了4個(gè)方法,比如:run(), run_one(), poll()和poll_one()。雖然大多數(shù)時(shí)候使用service.run()就可以,但是你還是需要在這里學(xué)習(xí)其他方法實(shí)現(xiàn)的功能。
再一次說明,如果有等待執(zhí)行的操作,run()會(huì)一直執(zhí)行,直到你手動(dòng)調(diào)用io_service::stop()。為了保證io_service一直執(zhí)行,通常你添加一個(gè)或者多個(gè)異步操作,然后在它們被執(zhí)行時(shí),你繼續(xù)一直不停地添加異步操作,比如下面代碼:
using namespace boost::asio;
io_service service;
ip::tcp::socket sock(service);
char buff_read[1024], buff_write[1024] = "ok";
void on_read(const boost::system::error_code &err, std::size_t bytes);
void on_write(const boost::system::error_code &err, std::size_t bytes)
{
sock.async_read_some(buffer(buff_read), on_read);
}
void on_read(const boost::system::error_code &err, std::size_t bytes)
{
// ... 處理讀取操作 ...
sock.async_write_some(buffer(buff_write,3), on_write);
}
void on_connect(const boost::system::error_code &err) {
sock.async_read_some(buffer(buff_read), on_read);
}
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
sock.async_connect(ep, on_connect);
service.run();
}
我在之前說過異步方法的handler是在調(diào)用了io_service::run的線程里被調(diào)用的。因?yàn)樵谥辽?0%~95%的時(shí)候,這是你唯一要用到的方法,所以我就把它說得簡(jiǎn)單了。對(duì)于調(diào)用了run_one(), poll()或者poll_one()的線程這一點(diǎn)也是適用的。
run_one()方法最多執(zhí)行和分發(fā)一個(gè)異步操作:
你可以認(rèn)為下面兩段代碼是等效的:
io_service service;
service.run(); // 或者
while ( !service.stopped()) service.run_once();
你可以使用run_once()啟動(dòng)一個(gè)異步操作,然后等待它執(zhí)行完成。
io_service service;
bool write_complete = false;
void on_write(const boost::system::error_code & err, size_t bytes)
{ write_complete = true; }
…
std::string data = "login ok”;
write_complete = false;
async_write(sock, buffer(data), on_write);
do service.run_once() while (!write_complete);
還有一些使用run_one()方法的例子,包含在Boost.Asio諸如blocking_tcp_client.cpp和blocking_udp_client.cpp的文件中。
poll_one方法以非阻塞的方式最多運(yùn)行一個(gè)準(zhǔn)備好的等待操作:
操作正在等待并準(zhǔn)備以非阻塞方式運(yùn)行,通常意味著如下的情況:
你可以使用poll_one去保證所有I/O操作的handler完成運(yùn)行,同時(shí)做一些其他的工作
io_service service;
while ( true) {
// 運(yùn)行所有完成了IO操作的handler
while ( service.poll_one()) ;
// ... 在這里做其他的事情 …
}
poll()方法會(huì)以非阻塞的方式運(yùn)行所有等待的操作。下面兩段代碼是等效的:
io_service service;
service.poll(); // 或者
while ( service.poll_one()) ;
所有上述方法都會(huì)在失敗的時(shí)候拋出boost::system::system_error異常。這是我們所不希望發(fā)生的事情;這里拋出的異常通常都是致命的,也許是資源耗盡,或者是你handler的其中一個(gè)拋出了異常。另外,每個(gè)方法都有一個(gè)不拋出異常,而是返回一個(gè)boost::system::error_code的重載:
io_service service;
boost::system::error_code err = 0;
service.run(err);
if ( err) std::cout << "Error " << err << std::endl;
異步工作不僅僅指用異步地方式接受客戶端到服務(wù)端的連接、異步地從一個(gè)socket讀取或者寫入到socket。它包含了所有可以異步執(zhí)行的操作。
默認(rèn)情況下,你是不知道每個(gè)異步handler的調(diào)用順序的。除了通常的異步調(diào)用(來自異步socket的讀取/寫入/接收)。你可以使用service.post()來使你的自定義方法被異步地調(diào)用。例如:
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << std::endl;
}
void worker_thread() {
service.run();
}
int main(int argc, char* argv[]) {
for ( int i = 0; i < 10; ++i)
service.post(boost::bind(func, i));
boost::thread_group threads;
for ( int i = 0; i < 3; ++i)
threads.create_thread(worker_thread);
// 等待所有線程被創(chuàng)建完
boost::this_thread::sleep( boost::posix_time::millisec(500));
threads.join_all();
}
在上面的例子中,service.post(some_function)添加了一個(gè)異步方法調(diào)用。
這個(gè)方法在某一個(gè)調(diào)用了service.run()的線程中請(qǐng)求io_service實(shí)例,然后調(diào)用給定的some_funtion之后立即返回。在我們的例子中,這個(gè)線程是我們之前創(chuàng)建的三個(gè)線程中的一個(gè)。你不能確定異步方法調(diào)用的順序。你不要期待它們會(huì)以我們調(diào)用post()方法的順序來調(diào)用。下面是運(yùn)行之前代碼可能得到的結(jié)果:
func called, i= 0
func called, i= 2
func called, i= 1
func called, i= 4
func called, i= 3
func called, i= 6
func called, i= 7
func called, i= 8
func called, i= 5
func called, i= 9
有時(shí)候你會(huì)想讓一些異步處理方法順序執(zhí)行。比如,你去一個(gè)餐館(go_to_restaurant),下單(order),然后吃(eat)。你需要先去餐館,然后下單,最后吃。這樣的話,你需要用到io_service::strand,這個(gè)方法會(huì)讓你的異步方法被順序調(diào)用??聪旅娴睦樱?/p>
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl;
}
void worker_thread() {
service.run();
}
int main(int argc, char* argv[])
{
io_service::strand strand_one(service), strand_two(service);
for ( int i = 0; i < 5; ++i)
service.post( strand_one.wrap( boost::bind(func, i)));
for ( int i = 5; i < 10; ++i)
service.post( strand_two.wrap( boost::bind(func, i)));
boost::thread_group threads;
for ( int i = 0; i < 3; ++i)
threads.create_thread(worker_thread);
// 等待所有線程被創(chuàng)建完
boost::this_thread::sleep( boost::posix_time::millisec(500));
threads.join_all();
}
在上述代碼中,我們保證前面的5個(gè)線程和后面的5個(gè)線程是順序執(zhí)行的。func called, i = 0在func called, i = 1之前被調(diào)用,然后調(diào)用func called, i = 2……同樣func called, i = 5在func called, i = 6之前,func called, i = 6在func called, i = 7被調(diào)用……你需要注意的是盡管方法是順序調(diào)用的,但是不意味著它們都在同一個(gè)線程執(zhí)行。運(yùn)行這個(gè)程序可能得到的一個(gè)結(jié)果如下:
func called, i= 0/002A60C8
func called, i= 5/002A6138
func called, i= 6/002A6530
func called, i= 1/002A6138
func called, i= 7/002A6530
func called, i= 2/002A6138
func called, i= 8/002A6530
func called, i= 3/002A6138
func called, i= 9/002A6530
func called, i= 4/002A6138
Boost.Asio提供了三種讓你把處理方法添加為異步調(diào)用的方式:
在之前的章節(jié)中你會(huì)看到關(guān)于service.post()的一個(gè)例子,以及運(yùn)行這個(gè)例子可能得到的一種結(jié)果。我們對(duì)它做一些修改,然后看看service.dispatch()是怎么影響輸出的結(jié)果的:
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << std::endl;
}
void run_dispatch_and_post() {
for ( int i = 0; i < 10; i += 2) {
service.dispatch(boost::bind(func, i));
service.post(boost::bind(func, i + 1));
}
}
int main(int argc, char* argv[]) {
service.post(run_dispatch_and_post);
service.run();
}
在解釋發(fā)生了什么之前,我們先運(yùn)行程序,觀察結(jié)果:
func called, i= 0
func called, i= 2
func called, i= 4
func called, i= 6
func called, i= 8
func called, i= 1
func called, i= 3
func called, i= 5
func called, i= 7
func called, i= 9
偶數(shù)先輸出,然后是奇數(shù)。這是因?yàn)槲矣?em>dispatch()輸出偶數(shù),然后用post()輸出奇數(shù)。dispatch()會(huì)在返回之前調(diào)用hanlder,因?yàn)楫?dāng)前的線程調(diào)用了service.run(),而post()每次都立即返回了。
現(xiàn)在,讓我們講講service.wrap(handler)。wrap()返回了一個(gè)仿函數(shù),它可以用來做另外一個(gè)方法的參數(shù):
using namespace boost::asio;
io_service service;
void dispatched_func_1() {
std::cout << "dispatched 1" << std::endl;
}
void dispatched_func_2() {
std::cout << "dispatched 2" << std::endl;
}
void test(boost::function<void()> func) {
std::cout << "test" << std::endl;
service.dispatch(dispatched_func_1);
func();
}
void service_run() {
service.run();
}
int main(int argc, char* argv[]) {
test( service.wrap(dispatched_func_2));
boost::thread th(service_run);
boost::this_thread::sleep( boost::posix_time::millisec(500));
th.join();
}
test(service.wrap(dispatched_func_2));會(huì)把dispatched_ func_2包裝起來創(chuàng)建一個(gè)仿函數(shù),然后傳遞給test當(dāng)作一個(gè)參數(shù)。當(dāng)test()被調(diào)用時(shí),它會(huì)分發(fā)調(diào)用方法1,然后調(diào)用func()。這時(shí),你會(huì)發(fā)現(xiàn)調(diào)用func()和service.dispatch(dispatched_func_2)是等價(jià)的,因?yàn)樗鼈兪沁B續(xù)調(diào)用的。程序的輸出證明了這一點(diǎn):
test
dispatched 1
dispatched 2
io_service::strand 類(用來序列化異步調(diào)用)也包含了poll(), dispatch()和 wrap()等成員函數(shù)。它們的作用和io_service的poll(), dispatch()和wrap()是一樣的。然而,大多數(shù)情況下你只需要把io_service::strand::wrap()方法做為io_service::poll()或者io_service::dispatch()方法的參數(shù)即可。
假設(shè)你需要做下面的操作:
io_service service;
ip::tcp::socket sock(service);
char buff[512];
...
read(sock, buffer(buff));
在這個(gè)例子中,sock和buff的存在時(shí)間都必須比read()調(diào)用的時(shí)間要長(zhǎng)。也就是說,在調(diào)用read()返回之前,它們都必須有效。這就是你所期望的;你傳給一個(gè)方法的所有參數(shù)在方法內(nèi)部都必須有效。當(dāng)我們采用異步方式時(shí),事情會(huì)變得比較復(fù)雜。
io_service service;
ip::tcp::socket sock(service);
char buff[512];
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), on_read);
在這個(gè)例子中,sock和buff的存在時(shí)間都必須比read()操作本身時(shí)間要長(zhǎng),但是read操作持續(xù)的時(shí)間我們是不知道的,因?yàn)樗钱惒降摹?/p>
當(dāng)使用socket緩沖區(qū)的時(shí)候,你會(huì)有一個(gè)buffer實(shí)例在異步調(diào)用時(shí)一直存在(使用boost::shared_array<>)。在這里,我們可以使用同樣的方式,通過創(chuàng)建一個(gè)類并在其內(nèi)部管理socket和它的讀寫緩沖區(qū)。然后,對(duì)于所有的異步操作,傳遞一個(gè)包含智能指針的boost::bind仿函數(shù)給它:
using namespace boost::asio;
io_service service;
struct connection : boost::enable_shared_from_this<connection> {
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<connection> ptr;
connection() : sock_(service), started_(true) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1));
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
private:
void on_connect(const error_code & err) {
// 這里你決定用這個(gè)連接做什么: 讀取或者寫入
if ( !err) do_read();
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( !started() ) return;
std::string msg(read_buffer_, bytes);
if ( msg == "can_login") do_write("access_data");
else if ( msg.find("data ") == 0) process_data(msg);
else if ( msg == "login_fail") stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
void do_read() {
sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
// 注意: 因?yàn)樵谧隽硗庖粋€(gè)async_read操作之前你想要發(fā)送多個(gè)消息,
// 所以你需要多個(gè)寫入buffer
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2));
}
void process_data(const std::string & msg) {
// 處理服務(wù)端來的內(nèi)容,然后啟動(dòng)另外一個(gè)寫入操作
}
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
};
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
connection::ptr(new connection)->start(ep);
}
在所有異步調(diào)用中,我們傳遞一個(gè)boost::bind仿函數(shù)當(dāng)作參數(shù)。這個(gè)仿函數(shù)內(nèi)部包含了一個(gè)智能指針,指向connection實(shí)例。只要有一個(gè)異步操作等待時(shí),Boost.Asio就會(huì)保存boost::bind仿函數(shù)的拷貝,這個(gè)拷貝保存了指向連接實(shí)例的一個(gè)智能指針,從而保證connection實(shí)例保持活動(dòng)。問題解決!
當(dāng)然,connection類僅僅是一個(gè)框架類;你需要根據(jù)你的需求對(duì)它進(jìn)行調(diào)整(它看起來會(huì)和當(dāng)前服務(wù)端例子的情況相當(dāng)不同)。
你需要注意的是創(chuàng)建一個(gè)新的連接是相當(dāng)簡(jiǎn)單的:connection::ptr(new connection)- >start(ep)。這個(gè)方法啟動(dòng)了到服務(wù)端的(異步)連接。當(dāng)你需要關(guān)閉這個(gè)連接時(shí),調(diào)用stop()。
當(dāng)實(shí)例被啟動(dòng)時(shí)(start()),它會(huì)等待客戶端的連接。當(dāng)連接發(fā)生時(shí)。on_connect()被調(diào)用。如果沒有錯(cuò)誤發(fā)生,它啟動(dòng)一個(gè)read操作(do_read())。當(dāng)read操作結(jié)束時(shí),你就可以解析這個(gè)消息;當(dāng)然你應(yīng)用的on_read()看起來會(huì)各種各樣。而當(dāng)你寫回一個(gè)消息時(shí),你需要把它拷貝到緩沖區(qū),然后像我在do_write()方法中所做的一樣將其發(fā)送出去,因?yàn)檫@個(gè)緩沖區(qū)同樣需要在這個(gè)異步寫操作中一直存活。最后需要注意的一點(diǎn)——當(dāng)寫回時(shí),你需要指定寫入的數(shù)量,否則,整個(gè)緩沖區(qū)都會(huì)被發(fā)送出去。
網(wǎng)絡(luò)api實(shí)際上要繁雜得多,這個(gè)章節(jié)只是做為一個(gè)參考,當(dāng)你在實(shí)現(xiàn)自己的網(wǎng)絡(luò)應(yīng)用時(shí)可以回過頭來看看。
Boost.Asio實(shí)現(xiàn)了端點(diǎn)的概念,你可以認(rèn)為是IP和端口。如果你不知道準(zhǔn)確的IP,你可以使用resolver對(duì)象將主機(jī)名,例如www.yahoo.com轉(zhuǎn)換為一個(gè)或多個(gè)IP地址。
我們也可以看到API的核心——socket類。Boost.Asio提供了TCP、UDP和 ICMP的實(shí)現(xiàn)。而且你還可以用你自己的協(xié)議來對(duì)它進(jìn)行擴(kuò)展;當(dāng)然,這個(gè)工作不適合缺乏勇氣的人。
異步編程是剛需。你應(yīng)該已經(jīng)明白為什么有時(shí)候需要用到它,尤其在寫服務(wù)端的時(shí)候。調(diào)用service.run()來實(shí)現(xiàn)異步循環(huán)就已經(jīng)可以讓你很滿足,但是有時(shí)候你需要更進(jìn)一步,嘗試使用run_one()、poll()或者poll_one()。
當(dāng)實(shí)現(xiàn)異步時(shí),你可以異步執(zhí)行你自己的方法;使用service.post()或者service.dispatch()。
最后,為了使socket和緩沖區(qū)(read或者write)在整個(gè)異步操作的生命周期中一直活動(dòng),我們需要采取特殊的防護(hù)措施。你的連接類需要繼承自enabled_shared_from_this,然后在內(nèi)部保存它需要的緩沖區(qū),而且每次異步調(diào)用都要傳遞一個(gè)智能指針給this操作。
下一章會(huì)進(jìn)行實(shí)戰(zhàn)操作;在實(shí)現(xiàn)回顯客戶端/服務(wù)端應(yīng)用時(shí)會(huì)有大量的編程實(shí)踐。
更多建議: