[譯]Boost.Asio基本原理

2018-06-19 15:34 更新

Boost.Asio基本原理

這一章涵蓋了使用Boost.Asio時(shí)必須知道的一些事情。我們也將深入研究比同步編程更復(fù)雜、更有樂趣的異步編程。

網(wǎng)絡(luò)API

這一部分包含了當(dāng)使用Boost.Asio編寫網(wǎng)絡(luò)應(yīng)用程序時(shí)必須知道的事情。

Boost.Asio命名空間

Boost.Asio的所有內(nèi)容都包含在boost::asio命名空間或者其子命名空間內(nèi)。

  • boost::asio:這是核心類和函數(shù)所在的地方。重要的類有io_service和streambuf。類似read, read_at, read_until方法,它們的異步方法,它們的寫方法和異步寫方法等自由函數(shù)也在這里。
  • boost::asio::ip:這是網(wǎng)絡(luò)通信部分所在的地方。重要的類有address, endpoint, tcp, udp和icmp,重要的自由函數(shù)有connectasync_connect。要注意的是在boost::asio::ip::tcp::socket中間,socket只是boost::asio::ip::tcp類中間的一個(gè)typedef關(guān)鍵字。
  • boost::asio::error:這個(gè)命名空間包含了調(diào)用I/O例程時(shí)返回的錯(cuò)誤碼
  • boost::asio::ssl:包含了SSL處理類的命名空間
  • boost::asio::local:這個(gè)命名空間包含了POSIX特性的類
  • boost::asio::windows:這個(gè)命名空間包含了Windows特性的類

IP地址

對于IP地址的處理,Boost.Asio提供了ip::address , ip::address_v4ip::address_v6類。 它們提供了相當(dāng)多的函數(shù)。下面列出了最重要的幾個(gè):

  • ip::address(v4_or_v6_address):這個(gè)函數(shù)把一個(gè)v4或者v6的地址轉(zhuǎn)換成ip::address
  • ip::address:from_string(str):這個(gè)函數(shù)根據(jù)一個(gè)IPv4地址(用.隔開的)或者一個(gè)IPv6地址(十六進(jìn)制表示)創(chuàng)建一個(gè)地址。
  • ip::address::to_string() :這個(gè)函數(shù)返回這個(gè)地址的字符串。
  • ip::address_v4::broadcast([addr, mask]):這個(gè)函數(shù)創(chuàng)建了一個(gè)廣播地址 ip::address_v4::any():這個(gè)函數(shù)返回一個(gè)能表示任意地址的地址。
  • ip::address_v4::loopback(), ip_address_v6::loopback():這個(gè)函數(shù)返回環(huán)路地址(為v4/v6協(xié)議)
  • ip::host_name():這個(gè)函數(shù)用string數(shù)據(jù)類型返回當(dāng)前的主機(jī)名。

大多數(shù)情況你會(huì)選擇用ip::address::from_string

ip::address addr = ip::address::from_string("127.0.0.1");

如果你想通過一個(gè)主機(jī)名進(jìn)行連接,下面的代碼片段是無用的:

// 拋出異常
ip::address addr = ip::address::from_string("www.yahoo.com");

端點(diǎn)

端點(diǎn)是使用某個(gè)端口連接到的一個(gè)地址。不同類型的socket有它自己的endpoint類,比如ip::tcp::endpoint、ip::udp::endpointip::icmp::endpoint

如果想連接到本機(jī)的80端口,你可以這樣做:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);

有三種方式來讓你建立一個(gè)端點(diǎn):

  • endpoint():這是默認(rèn)構(gòu)造函數(shù),某些時(shí)候可以用來創(chuàng)建UDP/ICMP socket。
  • endpoint(protocol, port):這個(gè)方法通常用來創(chuàng)建可以接受新連接的服務(wù)器端socket。
  • endpoint(addr, port):這個(gè)方法創(chuàng)建了一個(gè)連接到某個(gè)地址和端口的端點(diǎn)。

例子如下:

ip::tcp::endpoint ep1;
ip::tcp::endpoint ep2(ip::tcp::v4(), 80);
ip::tcp::endpoint ep3( ip::address::from_string("127.0.0.1), 80);

如果你想連接到一個(gè)主機(jī)(不是IP地址),你需要這樣做:

// 輸出 "87.248.122.122"
io_service service;
ip::tcp::resolver resolver(service);
ip::tcp::resolver::query query("www.yahoo.com", "80");
ip::tcp::resolver::iterator iter = resolver.resolve( query);
ip::tcp::endpoint ep = *iter;
std::cout << ep.address().to_string() << std::endl;

你可以用你需要的socket類型來替換tcp。首先,為你想要查詢的名字創(chuàng)建一個(gè)查詢器,然后用resolve()函數(shù)解析它。如果成功,它至少會(huì)返回一個(gè)入口。你可以利用返回的迭代器,使用第一個(gè)入口或者遍歷整個(gè)列表來拿到全部的入口。

給定一個(gè)端點(diǎn),可以獲得他的地址,端口和IP協(xié)議(v4或者v6):

std::cout << ep.address().to_string() << ":" << ep.port()
<< "/" << ep.protocol() << std::endl;

套接字

Boost.Asio有三種類型的套接字類:ip::tcp, ip::udpip::icmp。當(dāng)然它也是可擴(kuò)展的,你可以創(chuàng)建自己的socket類,盡管這相當(dāng)復(fù)雜。如果你選擇這樣做,參照一下boost/asio/ip/tcp.hpp, boost/asio/ip/udp.hppboost/asio/ip/icmp.hpp。它們都是含有內(nèi)部typedef關(guān)鍵字的超小類。

你可以把ip::tcp, ip::udp, ip::icmp類當(dāng)作占位符;它們可以讓你便捷地訪問其他類/函數(shù),如下所示:

  • ip::tcp::socket, ip::tcp::acceptor, ip::tcp::endpoint,ip::tcp::resolver, ip::tcp::iostream
  • ip::udp::socket, ip::udp::endpoint, ip::udp::resolver
  • ip::icmp::socket, ip::icmp::endpoint, ip::icmp::resolver

socket類創(chuàng)建一個(gè)相應(yīng)的socket。而且總是在構(gòu)造的時(shí)候傳入io_service實(shí)例:

io_service service;
ip::udp::socket sock(service)
sock.set_option(ip::udp::socket::reuse_address(true));

每一個(gè)socket的名字都是一個(gè)typedef關(guān)鍵字

  • ip::tcp::socket = basic_stream_socket<tcp>
  • ip::udp::socket = basic_datagram_socket<udp>
  • ip::icmp::socket = basic_raw_socket<icmp>

同步錯(cuò)誤碼

所有的同步函數(shù)都有拋出異?;蛘叻祷劐e(cuò)誤碼的重載,比如下面的代碼片段:

sync_func( arg1, arg2 ... argN); // 拋出異常
boost::system::error_code ec;
sync_func( arg1 arg2, ..., argN, ec); // 返回錯(cuò)誤碼

在這一章剩下的部分,你會(huì)見到大量的同步函數(shù)。簡單起見,我省略了有返回錯(cuò)誤碼的重載,但是不可否認(rèn)它們確實(shí)是存在的。

socket成員方法

這些方法被分成了幾組。并不是所有的方法都可以在各個(gè)類型的套接字里使用。這個(gè)部分的結(jié)尾將有一個(gè)列表來展示各個(gè)方法分別屬于哪個(gè)socket類。

注意所有的異步方法都立刻返回,而它們相對的同步實(shí)現(xiàn)需要操作完成之后才能返回。

連接相關(guān)的函數(shù)

這些方法是用來連接或綁定socket、斷開socket字連接以及查詢連接是活動(dòng)還是非活動(dòng)的:

  • assign(protocol,socket):這個(gè)函數(shù)分配了一個(gè)原生的socket給這個(gè)socket實(shí)例。當(dāng)處理老(舊)程序時(shí)會(huì)使用它(也就是說,原生socket已經(jīng)被建立了)
  • open(protocol):這個(gè)函數(shù)用給定的IP協(xié)議(v4或者v6)打開一個(gè)socket。你主要在UDP/ICMP socket,或者服務(wù)端socket上使用。
  • bind(endpoint):這個(gè)函數(shù)綁定到一個(gè)地址
  • connect(endpoint):這個(gè)函數(shù)用同步的方式連接到一個(gè)地址
  • async_connect(endpoint):這個(gè)函數(shù)用異步的方式連接到一個(gè)地址
  • is_open():如果套接字已經(jīng)打開,這個(gè)函數(shù)返回true
  • close():這個(gè)函數(shù)用來關(guān)閉套接字。調(diào)用時(shí)這個(gè)套接字上任何的異步操作都會(huì)被立即關(guān)閉,同時(shí)返回error::operation_aborted錯(cuò)誤碼。
  • shutdown(type_of_shutdown):這個(gè)函數(shù)立即使send或者receive操作失效,或者兩者都失效。
  • cancel():這個(gè)函數(shù)取消套接字上所有的異步操作。這個(gè)套接字上任何的異步操作都會(huì)立即結(jié)束,然后返回error::operation_aborted錯(cuò)誤碼。

例子如下:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.open(ip::tcp::v4()); n
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
char buff[1024]; sock.read_some(buffer(buff,1024));
sock.shutdown(ip::tcp::socket::shutdown_receive);
sock.close();

讀寫函數(shù)

這些是在套接字上執(zhí)行I/O操作的函數(shù)。

對于異步函數(shù)來說,處理程序的格式void handler(const boost::system::error_code& e, size_t bytes)都是一樣的

  • async_receive(buffer, [flags,] handler):這個(gè)函數(shù)啟動(dòng)從套接字異步接收數(shù)據(jù)的操作。
  • async_read_some(buffer,handler):這個(gè)函數(shù)和async_receive(buffer, handler)功能一樣。
  • async_receive_from(buffer, endpoint[, flags], handler):這個(gè)函數(shù)啟動(dòng)從一個(gè)指定端點(diǎn)異步接收數(shù)據(jù)的操作。
  • async_send(buffer [, flags], handler):這個(gè)函數(shù)啟動(dòng)了一個(gè)異步發(fā)送緩沖區(qū)數(shù)據(jù)的操作。
  • async_write_some(buffer, handler):這個(gè)函數(shù)和async_send(buffer, handler)功能一致。
  • async_send_to(buffer, endpoint, handler):這個(gè)函數(shù)啟動(dòng)了一個(gè)異步send緩沖區(qū)數(shù)據(jù)到指定端點(diǎn)的操作。
  • receive(buffer [, flags]):這個(gè)函數(shù)異步地從所給的緩沖區(qū)讀取數(shù)據(jù)。在讀完所有數(shù)據(jù)或者錯(cuò)誤出現(xiàn)之前,這個(gè)函數(shù)都是阻塞的。
  • read_some(buffer):這個(gè)函數(shù)的功能和receive(buffer)是一致的。
    • receive_from(buffer, endpoint [, flags])*:這個(gè)函數(shù)異步地從一個(gè)指定的端點(diǎn)獲取數(shù)據(jù)并寫入到給定的緩沖區(qū)。在讀完所有數(shù)據(jù)或者錯(cuò)誤出現(xiàn)之前,這個(gè)函數(shù)都是阻塞的。
  • send(buffer [, flags]):這個(gè)函數(shù)同步地發(fā)送緩沖區(qū)的數(shù)據(jù)。在所有數(shù)據(jù)發(fā)送成功或者出現(xiàn)錯(cuò)誤之前,這個(gè)函數(shù)都是阻塞的。
  • write_some(buffer):這個(gè)函數(shù)和send(buffer)的功能一致。
  • send_to(buffer, endpoint [, flags]):這個(gè)函數(shù)同步地把緩沖區(qū)數(shù)據(jù)發(fā)送到一個(gè)指定的端點(diǎn)。在所有數(shù)據(jù)發(fā)送成功或者出現(xiàn)錯(cuò)誤之前,這個(gè)函數(shù)都是阻塞的。
  • available():這個(gè)函數(shù)返回有多少字節(jié)的數(shù)據(jù)可以無阻塞地進(jìn)行同步讀取。

稍后我們將討論緩沖區(qū)。讓我們先來了解一下標(biāo)記。標(biāo)記的默認(rèn)值是0,但是也可以是以下幾種:

  • ip::socket_type::socket::message_peek:這個(gè)標(biāo)記只監(jiān)測并返回某個(gè)消息,但是下一次讀消息的調(diào)用會(huì)重新讀取這個(gè)消息。
  • ip::socket_type::socket::message_out_of_band:這個(gè)標(biāo)記處理帶外(OOB)數(shù)據(jù),OOB數(shù)據(jù)是被標(biāo)記為比正常數(shù)據(jù)更重要的數(shù)據(jù)。關(guān)于OOB的討論在這本書的內(nèi)容之外。
  • ip::socket_type::socket::message_do_not_route:這個(gè)標(biāo)記指定數(shù)據(jù)不使用路由表來發(fā)送。
  • ip::socket_type::socket::message_end_of_record:這個(gè)標(biāo)記指定的數(shù)據(jù)標(biāo)識(shí)了記錄的結(jié)束。在Windows下不支持。

你最常用的可能是message_peek,使用方法請參照下面的代碼片段:

char buff[1024];
sock.receive(buffer(buff), ip::tcp::socket::message_peek );
memset(buff,1024, 0);
// 重新讀取之前已經(jīng)讀取過的內(nèi)容
sock.receive(buffer(buff) );

下面的是一些教你如何同步或異步地從不同類型的套接字上讀取數(shù)據(jù)的例子:

  • 例1是在一個(gè)TCP套接字上進(jìn)行同步讀寫:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
std::cout << "bytes available " << sock.available() << std::endl;
char buff[512];
size_t read = sock.read_some(buffer(buff));

  • 例2是在一個(gè)UDP套接字上進(jìn)行同步讀寫:

ip::udp::socket sock(service);
sock.open(ip::udp::v4());
ip::udp::endpoint receiver_ep("87.248.112.181", 80);
sock.send_to(buffer("testing\n"), receiver_ep);
char buff[512];
ip::udp::endpoint sender_ep;
sock.receive_from(buffer(buff), sender_ep);

[?注意:就像上述代碼片段所展示的那樣,使用receive_from從一個(gè)UDP套接字讀取數(shù)據(jù)時(shí),你需要構(gòu)造一個(gè)默認(rèn)的端點(diǎn)]

  • 例3是從一個(gè)UDP服務(wù)套接字中異步讀取數(shù)據(jù):

using namespace boost::asio;
io_service service;
ip::udp::socket sock(service);
boost::asio::ip::udp::endpoint sender_ep;
char buff[512];
void on_read(const boost::system::error_code & err, std::size_t read_bytes) {
    std::cout << "read " << read_bytes << std::endl;
    sock.async_receive_from(buffer(buff), sender_ep, on_read);
}
int main(int argc, char* argv[]) {
    ip::udp::endpoint ep(ip::address::from_string("127.0.0.1"),
8001);
    sock.open(ep.protocol());
    sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
    sock.bind(ep);
    sock.async_receive_from(buffer(buff,512), sender_ep, on_read);
    service.run();
}

套接字控制:

這些函數(shù)用來處理套接字的高級(jí)選項(xiàng):

  • get_io_service():這個(gè)函數(shù)返回構(gòu)造函數(shù)中傳入的io_service實(shí)例
  • get_option(option):這個(gè)函數(shù)返回一個(gè)套接字的屬性
  • set_option(option):這個(gè)函數(shù)設(shè)置一個(gè)套接字的屬性
  • io_control(cmd):這個(gè)函數(shù)在套接字上執(zhí)行一個(gè)I/O指令

這些是你可以獲取/設(shè)置的套接字選項(xiàng):

名字 描述 類型
broadcast 如果為true,允許廣播消息 bool
debug 如果為true,啟用套接字級(jí)別的調(diào)試 bool
do_not_route 如果為true,則阻止路由選擇只使用本地接口 bool
enable_connection_aborted 如果為true,記錄在accept()時(shí)中斷的連接 bool
keep_alive 如果為true,會(huì)發(fā)送心跳 bool
linger 如果為true,套接字會(huì)在有未發(fā)送數(shù)據(jù)的情況下掛起close() bool
receive_buffer_size 套接字接收緩沖區(qū)大小 int
receive_low_watemark 規(guī)定套接字輸入處理的最小字節(jié)數(shù) int
reuse_address 如果為true,套接字能綁定到一個(gè)已用的地址 bool
send_buffer_size 套接字發(fā)送緩沖區(qū)大小 int
send_low_watermark 規(guī)定套接字?jǐn)?shù)據(jù)發(fā)送的最小字節(jié)數(shù) int
ip::v6_only 如果為true,則只允許IPv6的連接 bool

每個(gè)名字代表了一個(gè)內(nèi)部套接字typedef或者類。下面是對它們的使用:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
// TCP套接字可以重用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);
// 獲取套接字讀取的數(shù)據(jù)
ip::tcp::socket::receive_buffer_size rbs;
sock.get_option(rbs);
std::cout << rbs.value() << std::endl;
// 把套接字的緩沖區(qū)大小設(shè)置為8192
ip::tcp::socket::send_buffer_size sbs(8192);
sock.set_option(sbs);

[?在上述特性工作之前,套接字要被打開。否則,會(huì)拋出異常]

TCP VS UDP VS ICMP

就像我之前所說,不是所有的成員方法在所有的套接字類中都可用。我做了一個(gè)包含成員函數(shù)不同點(diǎn)的列表。如果一個(gè)成員函數(shù)沒有出現(xiàn)在這,說明它在所有的套接字類都是可用的。

名字 TCP UDP ICMP
async_read_some - -
async_receive_from -
async_write_some - -
async_send_to -
read_some - -
receive_from -
write_some - -
send_to -

其他方法

其他與連接和I/O無關(guān)的函數(shù)如下:

  • local_endpoint():這個(gè)方法返回套接字本地連接的地址。
  • remote_endpoint():這個(gè)方法返回套接字連接到的遠(yuǎn)程地址。
  • native_handle():這個(gè)方法返回原始套接字的處理程序。你只有在調(diào)用一個(gè)Boost.Asio不支持的原始方法時(shí)才需要用到它。
  • non_blocking():如果套接字是非阻塞的,這個(gè)方法返回true,否則false。
  • native_non_blocking():如果套接字是非阻塞的,這個(gè)方法返回true,否則返回false。但是,它是基于原生的套接字來調(diào)用本地的api。所以通常來說,你不需要調(diào)用這個(gè)方法(non_blocking()已經(jīng)緩存了這個(gè)結(jié)果);你只有在直接調(diào)用native_handle()這個(gè)方法的時(shí)候才需要用到這個(gè)方法。
  • at_mark():如果套接字要讀的是一段OOB數(shù)據(jù),這個(gè)方法返回true。這個(gè)方法你很少會(huì)用到。

其他需要考慮的事情

最后要注意的一點(diǎn),套接字實(shí)例不能被拷貝,因?yàn)榭截悩?gòu)造方法和=操作符是不可訪問的。

ip::tcp::socket s1(service), s2(service);
s1 = s2; // 編譯時(shí)報(bào)錯(cuò)
ip::tcp::socket s3(s1); // 編譯時(shí)報(bào)錯(cuò)

這是非常有意義的,因?yàn)槊恳粋€(gè)實(shí)例都擁有并管理著一個(gè)資源(原生套接字本身)。如果我們允許拷貝構(gòu)造,結(jié)果是我們會(huì)有兩個(gè)實(shí)例擁有同樣的原生套接字;這樣我們就需要去處理所有者的問題(讓一個(gè)實(shí)例擁有所有權(quán)?或者使用引用計(jì)數(shù)?還是其他的方法)Boost.Asio選擇不允許拷貝(如果你想要?jiǎng)?chuàng)建一個(gè)備份,請使用共享指針)

typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
socket_ptr sock1(new ip::tcp::socket(service));
socket_ptr sock2(sock1); // ok
socket_ptr sock3;           
sock3 = sock1; // ok

套接字緩沖區(qū)

當(dāng)從一個(gè)套接字讀寫內(nèi)容時(shí),你需要一個(gè)緩沖區(qū),用來保存讀取和寫入的數(shù)據(jù)。緩沖區(qū)內(nèi)存的有效時(shí)間必須比I/O操作的時(shí)間要長;你需要保證它們在I/O操作結(jié)束之前不被釋放。 對于同步操作來說,這很容易;當(dāng)然,這個(gè)緩沖區(qū)在receive和send時(shí)都存在。

char buff[512];
...
sock.receive(buffer(buff));
strcpy(buff, "ok\n");
sock.send(buffer(buff));

但是在異步操作時(shí)就沒這么簡單了,看下面的代碼片段:

// 非常差勁的代碼 ...
void on_read(const boost::system::error_code & err, std::size_t read_bytes)
{ ... }
void func() {
    char buff[512];
    sock.async_receive(buffer(buff), on_read);
}

在我們調(diào)用async_receive()之后,buff就已經(jīng)超出有效范圍,它的內(nèi)存當(dāng)然會(huì)被釋放。當(dāng)我們開始從套接字接收一些數(shù)據(jù)時(shí),我們會(huì)把它們拷貝到一片已經(jīng)不屬于我們的內(nèi)存中;它可能會(huì)被釋放,或者被其他代碼重新開辟來存入其他的數(shù)據(jù),結(jié)果就是:內(nèi)存沖突。

對于上面的問題有幾個(gè)解決方案:

  • 使用全局緩沖區(qū)
  • 創(chuàng)建一個(gè)緩沖區(qū),然后在操作結(jié)束時(shí)釋放它
  • 使用一個(gè)集合對象管理這些套接字和其他的數(shù)據(jù),比如緩沖區(qū)數(shù)組

第一個(gè)方法顯然不是很好,因?yàn)槲覀兌贾廊肿兞糠浅2缓?。此外,如果兩個(gè)實(shí)例使用同一個(gè)緩沖區(qū)怎么辦?

下面是第二種方式的實(shí)現(xiàn):

void on_read(char * ptr, const boost::system::error_code & err, std::size_t read_bytes) {                       
    delete[] ptr;
}
....
char * buff = new char[512];
sock.async_receive(buffer(buff, 512), boost::bind(on_read,buff,_1,_2))

或者,如果你想要緩沖區(qū)在操作結(jié)束后自動(dòng)超出范圍,使用共享指針

struct shared_buffer {
    boost::shared_array<char> buff;
    int size;
    shared_buffer(size_t size) : buff(new char[size]), size(size) {
    }
    mutable_buffers_1 asio_buff() const {
        return buffer(buff.get(), size);
    }
};




// 當(dāng)on_read超出范圍時(shí), boost::bind對象被釋放了,
// 同時(shí)也會(huì)釋放共享指針
void on_read(shared_buffer, const boost::system::error_code & err, std::size_t read_bytes) {}
sock.async_receive(buff.asio_buff(), boost::bind(on_read,buff,_1,_2));

shared_buffer類擁有實(shí)質(zhì)的shared_array<>,shared_array<>存在的目的是用來保存shared_buffer實(shí)例的拷貝-當(dāng)最后一個(gè)share_array<>元素超出范圍時(shí),shared_array<>就被自動(dòng)銷毀了,而這就是我們想要的結(jié)果。

因?yàn)锽oost.Asio會(huì)給完成處理句柄保留一個(gè)拷貝,當(dāng)操作完成時(shí)就會(huì)調(diào)用這個(gè)完成處理句柄,所以你的目的達(dá)到了。那個(gè)拷貝是一個(gè)boost::bind的仿函數(shù),它擁有著實(shí)際的shared_buffer實(shí)例。這是非常優(yōu)雅的!

第三個(gè)選擇是使用一個(gè)連接對象來管理套接字和其他數(shù)據(jù),比如緩沖區(qū),通常來說這是正確的解決方案但是非常復(fù)雜。在這一章的末尾我們會(huì)對這種方法進(jìn)行討論。

緩沖區(qū)封裝函數(shù)

縱觀所有代碼,你會(huì)發(fā)現(xiàn):無論什么時(shí)候,當(dāng)我們需要對一個(gè)buffer進(jìn)行讀寫操作時(shí),代碼會(huì)把實(shí)際的緩沖區(qū)對象封裝在一個(gè)buffer()方法中,然后再把它傳遞給方法調(diào)用:

char buff[512];
sock.async_receive(buffer(buff), on_read);

基本上我們都會(huì)把緩沖區(qū)包含在一個(gè)類中以便Boost.Asio的方法能遍歷這個(gè)緩沖區(qū),比方說,使用下面的代碼:

sock.async_receive(some_buffer, on_read);

實(shí)例some_buffer需要滿足一些需求,叫做ConstBufferSequence或者MutableBufferSequence(你可以在Boost.Asio的文檔中查看它們)。創(chuàng)建你自己的類去處理這些需求的細(xì)節(jié)是非常復(fù)雜的,但是Boost.Asio已經(jīng)提供了一些類用來處理這些需求。所以你不用直接訪問這些緩沖區(qū),而可以使用buffer()方法。

自信地講,你可以把下面列出來的類型都包裝到一個(gè)buffer()方法中:

  • 一個(gè)char[] const 數(shù)組
  • 一個(gè)字節(jié)大小的void *指針
  • 一個(gè)std::string類型的字符串
  • 一個(gè)POD const數(shù)組(POD代表純數(shù)據(jù),這意味著構(gòu)造器和釋放器不做任何操作)
  • 一個(gè)pod數(shù)據(jù)的std::vector
  • 一個(gè)包含pod數(shù)據(jù)的boost::array
  • 一個(gè)包含pod數(shù)據(jù)的std::array

下面的代碼都是有效的:

struct pod_sample { int i; long l; char c; };
...
char b1[512];
void * b2 = new char[512];
std::string b3; b3.resize(128);
pod_sample b4[16];
std::vector<pod_sample> b5; b5.resize(16);
boost::array<pod_sample,16> b6;
std::array<pod_sample,16> b7;
sock.async_send(buffer(b1), on_read);
sock.async_send(buffer(b2,512), on_read);
sock.async_send(buffer(b3), on_read);
sock.async_send(buffer(b4), on_read);
sock.async_send(buffer(b5), on_read);
sock.async_send(buffer(b6), on_read);
sock.async_send(buffer(b7), on_read);

總的來說就是:與其創(chuàng)建你自己的類來處理ConstBufferSequence或者MutableBufferSequence的需求,不如創(chuàng)建一個(gè)能在你需要的時(shí)候保留緩沖區(qū),然后返回一個(gè)mutable_buffers_1實(shí)例的類,而我們早在shared_buffer類中就這樣做了。

read/write/connect自由函數(shù)

Boost.Asio提供了處理I/O的自由函數(shù),我們分四組來分析它們。

connect方法

這些方法把套接字連接到一個(gè)端點(diǎn)。

  • connect(socket, begin [, end] [, condition]):這個(gè)方法遍歷隊(duì)列中從start到end的端點(diǎn)來嘗試同步連接。begin迭代器是調(diào)用socket_type::resolver::query的返回結(jié)果(你可能需要回顧一下端點(diǎn)這個(gè)章節(jié))。特別提示end迭代器是可選的;你可以忽略它。你還可以提供一個(gè)condition的方法給每次連接嘗試之后調(diào)用。用法是Iterator connect_condition(const boost::system::error_code & err,Iterator next);。你可以選擇返回一個(gè)不是next的迭代器,這樣你就可以跳過一些端點(diǎn)。
  • async_connect(socket, begin [, end] [, condition], handler):這個(gè)方法異步地調(diào)用連接方法,在結(jié)束時(shí),它會(huì)調(diào)用完成處理方法。用法是void handler(constboost::system::error_code & err, Iterator iterator);。傳遞給處理方法的第二個(gè)參數(shù)是連接成功端點(diǎn)的迭代器(或者end迭代器)。

它的例子如下:

using namespace boost::asio::ip;
tcp::resolver resolver(service);
tcp::resolver::iterator iter = resolver.resolve(tcp::resolver::query("www.yahoo.com","80"));
tcp::socket sock(service);
connect(sock, iter);

一個(gè)主機(jī)名可以被解析成多個(gè)地址,而connectasync_connect能很好地把你從嘗試每個(gè)地址然后找到一個(gè)可用地址的繁重工作中解放出來,因?yàn)樗鼈円呀?jīng)幫你做了這些。

read/write方法

這些方法對一個(gè)流進(jìn)行讀寫操作(可以是套接字,或者其他表現(xiàn)得像流的類):

  • async_read(stream, buffer [, completion] ,handler):這個(gè)方法異步地從一個(gè)流讀取。結(jié)束時(shí)其處理方法被調(diào)用。處理方法的格式是:void handler(const boost::system::error_ code & err, size_t bytes);。你可以選擇指定一個(gè)完成處理方法。完成處理方法會(huì)在每個(gè)read操作調(diào)用成功之后調(diào)用,然后告訴Boost.Asio async_read操作是否完成(如果沒有完成,它會(huì)繼續(xù)讀?。?。它的格式是:size_t completion(const boost::system::error_code& err, size_t bytes_transfered) 。當(dāng)這個(gè)完成處理方法返回0時(shí),我們認(rèn)為read操作完成;如果它返回一個(gè)非0值,它表示了下一個(gè)async_read_some操作需要從流中讀取的字節(jié)數(shù)。接下來會(huì)有一個(gè)例子來詳細(xì)展示這些。
  • async_write(stream, buffer [, completion], handler):這個(gè)方法異步地向一個(gè)流寫入數(shù)據(jù)。參數(shù)的意義和async_read是一樣的。
  • read(stream, buffer [, completion]):這個(gè)方法同步地從一個(gè)流中讀取數(shù)據(jù)。參數(shù)的意義和async_read是一樣的。
  • write(stream, buffer [, completion]): 這個(gè)方法同步地向一個(gè)流寫入數(shù)據(jù)。參數(shù)的意義和async_read是一樣的。

async_read(stream, stream_buffer [, completion], handler)
async_write(strean, stream_buffer [, completion], handler)
write(stream, stream_buffer [, completion])
read(stream, stream_buffer [, completion]) 

首先,要注意第一個(gè)參數(shù)變成了流,而不單是socket。這個(gè)參數(shù)包含了socket但不僅僅是socket。比如,你可以用一個(gè)Windows的文件句柄來替代socket。 當(dāng)下面情況出現(xiàn)時(shí),所有read和write操作都會(huì)結(jié)束:

  • 可用的緩沖區(qū)滿了(當(dāng)讀取時(shí))或者所有的緩沖區(qū)已經(jīng)被寫入(當(dāng)寫入時(shí))
  • 完成處理方法返回0(如果你提供了這么一個(gè)方法)
  • 錯(cuò)誤發(fā)生時(shí)

下面的代碼會(huì)異步地從一個(gè)socket中間讀取數(shù)據(jù)直到讀取到’\n’:

io_service service;
ip::tcp::socket sock(service);
char buff[512];
int offset = 0;
size_t up_to_enter(const boost::system::error_code &, size_t bytes) {
    for ( size_t i = 0; i < bytes; ++i)
        if ( buff[i + offset] == '\n') 
            return 0;
    return 1; 
 }
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), up_to_enter, on_read); 

Boost.Asio也提供了一些簡單的完成處理仿函數(shù):

  • transfer_at_least(n)
  • transfer_exactly(n)
  • transfer_all()

例子如下:

char buff[512]; 
void on_read(const boost::system::error_code &, size_t) {} 
// 讀取32個(gè)字節(jié) 
async_read(sock, buffer(buff), transfer_exactly(32), on_read);

上述的4個(gè)方法,不使用普通的緩沖區(qū),而使用由Boost.Asio的std::streambuf類繼承來的stream_buffer方法。stl流和流緩沖區(qū)非常復(fù)雜;下面是例子:

io_service service;  
void on_read(streambuf& buf, const boost::system::error_code &, size_t) { 
    std::istream in(&buf);
    std::string line;
    std::getline(in, line);
    std::cout << "first line: " << line << std::endl; 
}
int main(int argc, char* argv[]) { 
    HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
    windows::stream_handle h(service, file);
    streambuf buf;
    async_read(h, buf, transfer_exactly(256), boost::bind(on_read,boost::ref(buf),_1,_2));
    service.run(); 
}

在這里,我向你們展示了如何在一個(gè)Windows文件句柄上調(diào)用async_read。讀取前256個(gè)字符,然后把它們保存到緩沖區(qū)中,當(dāng)操作結(jié)束時(shí)。on_read被調(diào)用,再創(chuàng)建std::istream用來傳遞緩沖區(qū),讀取第一行(std::getline),最后把它輸出到命令行中。

read_until/async_read_until方法

這些方法在條件滿足之前會(huì)一直讀取:

  • async_read_until(stream, stream_buffer, delim, handler):這個(gè)方法啟動(dòng)一個(gè)異步read操作。read操作會(huì)在讀取到某個(gè)分隔符時(shí)結(jié)束。分隔符可以是字符,std::string或者boost::regex。處理方法的格式為:void handler(const boost::system::error_code & err, size_t bytes);
  • async_read_until(strem, stream_buffer, completion, handler):這個(gè)方法和之前的方法是一樣的,但是沒有分隔符,而是一個(gè)完成處理方法。完成處理方法的格式為:pair< iterator,bool > completion(iterator begin, iterator end);,其中迭代器的類型為buffers_iterator< streambuf::const_buffers_type >。你需要記住的是這個(gè)迭代器是支持隨機(jī)訪問的。你掃描整個(gè)區(qū)間(begin,end),然后決定read操作是否應(yīng)該結(jié)束。返回的結(jié)果是一個(gè)結(jié)果對,第一個(gè)成員是一個(gè)迭代器,它指向最后被這個(gè)方法訪問的字符;第二個(gè)成員指定read操作是否需要結(jié)束,需要時(shí)返回true,否則返回false。
  • read_until(stream, stream_buffer, delim):這個(gè)方法執(zhí)行一個(gè)同步的read操作,參數(shù)的意義和async_read_until一樣。
  • read_until(stream, stream_buffer, completion):這個(gè)方法執(zhí)行一個(gè)同步的read操作,參數(shù)的意義和async_read_until一樣。

下面這個(gè)例子在讀到一個(gè)指定的標(biāo)點(diǎn)符號(hào)之前會(huì)一直讀?。?/p>

typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator, bool> match_punct(iterator begin, iterator end) {
    while ( begin != end)
        if ( std::ispunct(*begin))
            return std::make_pair(begin,true);
    return std::make_pair(end,false);
}
void on_read(const boost::system::error_code &, size_t) {}
...
streambuf buf;
async_read_until(sock, buf, match_punct, on_read);

如果我們想讀到一個(gè)空格時(shí)就結(jié)束,我們需要把最后一行修改為:

async_read_until(sock, buff, ' ', on_read);

*_at方法

這些方法用來在一個(gè)流上面做隨機(jī)存取操作。由你來指定readwrite操作從什么地方開始(offset):

  • async_read_at(stream, offset, buffer [, completion], handler):這個(gè)方法在指定的流的offset處開始執(zhí)行一個(gè)異步的read操作,當(dāng)操作結(jié)束時(shí),它會(huì)調(diào)用handler。handler的格式為:void handler(const boost::system::error_code& err, size_t bytes);。buffer可以是普通的wrapper()封裝或者streambuf方法。如果你指定一個(gè)completion方法,它會(huì)在每次read操作成功之后調(diào)用,然后告訴Boost.Asio async_read_at操作已經(jīng)完成(如果沒有,則繼續(xù)讀取)。它的格式為:size_t completion(const boost::system::error_code& err, size_t bytes);。當(dāng)completion方法返回0時(shí),我們認(rèn)為read操作完成了;如果返回一個(gè)非零值,它代表了下一次調(diào)用流的async_read_some_at方法的最大讀取字節(jié)數(shù)。
  • async_write_at(stream, offset, buffer [, completion], handler):這個(gè)方法執(zhí)行一個(gè)異步的write操作。參數(shù)的意義和async_read_at是一樣的
  • read_at(stream, offset, buffer [, completion]):這個(gè)方法在一個(gè)執(zhí)行的流上,指定的offset處開始read。參數(shù)的意義和async_read_at是一樣的
  • write_at(stream, offset, buffer [, completion]):這個(gè)方法在一個(gè)執(zhí)行的流上,指定的offset處開始write。參數(shù)的意義和async_read_at是一樣的

這些方法不支持套接字。它們用來處理流的隨機(jī)訪問;也就是說,流是可以隨機(jī)訪問的。套接字顯然不是這樣(套接字是不可回溯的)。

下面這個(gè)例子告訴你怎么從一個(gè)文件偏移為256的位置讀取128個(gè)字節(jié):

io_service service;
int main(int argc, char* argv[]) {
    HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
    windows::random_access_handle h(service, file);
    streambuf buf;
    read_at(h, 256, buf, transfer_exactly(128));
    std::istream in(&buf);
    std::string line;
    std::getline(in, line);
    std::cout << "first line: " << line << std::endl;
}

異步編程

這部分對異步編程時(shí)可能碰到的一些問題進(jìn)行了深入的探究。我建議你先讀一遍,然后在接下來讀這本書的過程中,再經(jīng)?;剡^頭來看看,從而增強(qiáng)你對這部分的理解。

異步的需求

就像我之前所說的,同步編程比異步編程簡單很多。這是因?yàn)椋€性的思考是很簡單的(調(diào)用A,調(diào)用A結(jié)束,調(diào)用B,調(diào)用B結(jié)束,然后繼續(xù),這是以事件處理的方式來思考)。后面你會(huì)碰到這種情況,比如:五件事情,你不知道它們執(zhí)行的順序,也不知道他們是否會(huì)執(zhí)行!

盡管異步編程更難,但是你會(huì)更傾向于選擇使用它,比如:寫一個(gè)需要處理很多并發(fā)訪問的服務(wù)端。并發(fā)訪問越多,異步編程就比同步編程越簡單。

假設(shè):你有一個(gè)需要處理1000個(gè)并發(fā)訪問的應(yīng)用,從客戶端發(fā)給服務(wù)端的每個(gè)信息都會(huì)再返回給客戶端,以‘\n’結(jié)尾。

同步方式的代碼,1個(gè)線程:

using namespace boost::asio;
struct client {
    ip::tcp::socket sock;
    char buff[1024]; // 每個(gè)信息最多這么大
    int already_read; // 你已經(jīng)讀了多少
};
std::vector<client> clients;
void handle_clients() {
    while ( true)
        for ( int i = 0; i < clients.size(); ++i)
            if ( clients[i].sock.available() ) on_read(clients[i]);
}
void on_read(client & c) {
    int to_read = std::min( 1024 - c.already_read, c.sock.available());
    c.sock.read_some( buffer(c.buff + c.already_read, to_read));
    c.already_read += to_read;
    if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) {
        int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff;
        std::string msg(c.buff, c.buff + pos);
        std::copy(c.buff + pos, c.buff + 1024, c.buff);
        c.already_read -= pos;
        on_read_msg(c, msg);
    }
}
void on_read_msg(client & c, const std::string & msg) {
    // 分析消息,然后返回
    if ( msg == "request_login")
        c.sock.write( "request_ok\n");
    else if ...
}

有一種情況是在任何服務(wù)端(和任何基于網(wǎng)絡(luò)的應(yīng)用)都需要避免的,就是代碼無響應(yīng)的情況。在我們的例子里,我們需要handle_clients()方法盡可能少的阻塞。如果方法在某個(gè)點(diǎn)上阻塞,任何進(jìn)來的信息都需要等待方法解除阻塞才能被處理。

為了保持響應(yīng),只在一個(gè)套接字有數(shù)據(jù)的時(shí)候我們才讀,也就是說,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read時(shí),我們只讀當(dāng)前可用的;調(diào)用read_until(c.sock, buffer(...), '\n')會(huì)是一個(gè)非常糟糕的選擇,因?yàn)橹钡轿覀儚囊粋€(gè)指定的客戶端讀取了完整的消息之前,它都是阻塞的(我們永遠(yuǎn)不知道它什么時(shí)候會(huì)讀取到完整的消息)

這里的瓶頸就是on_read_msg()方法;當(dāng)它執(zhí)行時(shí),所有進(jìn)來的消息都在等待。一個(gè)良好的on_read_msg()方法實(shí)現(xiàn)會(huì)保證這種情況基本不會(huì)發(fā)生,但是它還是會(huì)發(fā)生(有時(shí)候向一個(gè)套接字寫入數(shù)據(jù),緩沖區(qū)滿了時(shí),它會(huì)被阻塞) 同步方式的代碼,10個(gè)線程

using namespace boost::asio;
struct client {
   // ... 和之前一樣
    bool set_reading() {
        boost::mutex::scoped_lock lk(cs_);
        if ( is_reading_) return false; // 已經(jīng)在讀取
        else { is_reading_ = true; return true; }
    }
    void unset_reading() {
        boost::mutex::scoped_lock lk(cs_);
        is_reading_ = false;
    }
private:
    boost::mutex cs_;
    bool is_reading_;
};
std::vector<client> clients;
void handle_clients() {
    for ( int i = 0; i < 10; ++i)
        boost::thread( handle_clients_thread);
}
void handle_clients_thread() {
    while ( true)
        for ( int i = 0; i < clients.size(); ++i)
            if ( clients[i].sock.available() )
                if ( clients[i].set_reading()) {
                    on_read(clients[i]);
                    clients[i].unset_reading();
                }
}
void on_read(client & c) {
    // 和之前一樣
}
void on_read_msg(client & c, const std::string & msg) {
    // 和之前一樣
}

為了使用多線程,我們需要對線程進(jìn)行同步,這就是set_reading()set_unreading()所做的。set_reading()方法非常重要,比如你想要一步實(shí)現(xiàn)“判斷是否在讀取然后標(biāo)記為讀取中”。但這是有兩步的(“判斷是否在讀取”和“標(biāo)記為讀取中”),你可能會(huì)有兩個(gè)線程同時(shí)為一個(gè)客戶端判斷是否在讀取,然后你會(huì)有兩個(gè)線程同時(shí)為一個(gè)客戶端調(diào)用on_read,結(jié)果就是數(shù)據(jù)沖突甚至導(dǎo)致應(yīng)用崩潰。

你會(huì)發(fā)現(xiàn)代碼變得極其復(fù)雜。

同步編程有第三個(gè)選擇,就是為每個(gè)連接開辟一個(gè)線程。但是當(dāng)并發(fā)的線程增加時(shí),這就成了一種災(zāi)難性的情況。

然后,讓我們來看異步編程。我們不斷地異步讀取。當(dāng)一個(gè)客戶端請求某些東西時(shí),on_read被調(diào)用,然后回應(yīng),然后等待下一個(gè)請求(然后開始另外一個(gè)異步的read操作)。

異步方式的代碼,10個(gè)線程

using namespace boost::asio;
io_service service;
struct client {
    ip::tcp::socket sock;
    streambuf buff; // 從客戶端取回結(jié)果
}
std::vector<client> clients;
void handle_clients() {
    for ( int i = 0; i < clients.size(); ++i)
        async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2));
    for ( int i = 0; i < 10; ++i)
        boost::thread(handle_clients_thread);
}
void handle_clients_thread() {
    service.run();
}
void on_read(client & c, const error_code & err, size_t read_bytes) {
    std::istream in(&c.buff);
    std::string msg;
    std::getline(in, msg);
    if ( msg == "request_login")
        c.sock.async_write( "request_ok\n", on_write);
    else if ...
    ...
    // 等待同一個(gè)客戶端下一個(gè)讀取操作
    async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2));
}

發(fā)現(xiàn)代碼變得有多簡單了吧?client結(jié)構(gòu)里面只有兩個(gè)成員,handle_clients()僅僅調(diào)用了async_read_until,然后它創(chuàng)建了10個(gè)線程,每個(gè)線程都調(diào)用service.run()。這些線程會(huì)處理所有來自客戶端的異步read操作,然后分發(fā)所有向客戶端的異步write操作。另外需要注意的一件事情是:on_read()一直在為下一次異步read操作做準(zhǔn)備(看最后一行代碼)。

異步run(), runone(), poll(), poll one()

為了實(shí)現(xiàn)監(jiān)聽循環(huán),io_service類提供了4個(gè)方法,比如:run(), run_one(), poll()poll_one()。雖然大多數(shù)時(shí)候使用service.run()就可以,但是你還是需要在這里學(xué)習(xí)其他方法實(shí)現(xiàn)的功能。

持續(xù)運(yùn)行

再一次說明,如果有等待執(zhí)行的操作,run()會(huì)一直執(zhí)行,直到你手動(dòng)調(diào)用io_service::stop()。為了保證io_service一直執(zhí)行,通常你添加一個(gè)或者多個(gè)異步操作,然后在它們被執(zhí)行時(shí),你繼續(xù)一直不停地添加異步操作,比如下面代碼:

using namespace boost::asio;
io_service service;
ip::tcp::socket sock(service);
char buff_read[1024], buff_write[1024] = "ok";
void on_read(const boost::system::error_code &err, std::size_t bytes);
void on_write(const boost::system::error_code &err, std::size_t bytes)
{
    sock.async_read_some(buffer(buff_read), on_read);
}
void on_read(const boost::system::error_code &err, std::size_t bytes)
{
    // ... 處理讀取操作 ...
    sock.async_write_some(buffer(buff_write,3), on_write);
}
void on_connect(const boost::system::error_code &err) {
    sock.async_read_some(buffer(buff_read), on_read);
}
int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
    sock.async_connect(ep, on_connect);
    service.run();
}

  1. 當(dāng)service.run()被調(diào)用時(shí),有一個(gè)異步操作在等待。
  2. 當(dāng)socket連接到服務(wù)端時(shí),on_connect被調(diào)用了,它會(huì)添加一個(gè)異步操作。
  3. 當(dāng)on_connect結(jié)束時(shí),我們會(huì)留下一個(gè)等待的操作(read)。
  4. 當(dāng)on_read被調(diào)用時(shí),我們寫入一個(gè)回應(yīng),這又添加了另外一個(gè)等待的操作。
  5. 當(dāng)on_read結(jié)束時(shí),我們會(huì)留下一個(gè)等待的操作(write)。
  6. 當(dāng)on_write操作被調(diào)用時(shí),我們從服務(wù)端讀取另外一個(gè)消息,這也添加了另外一個(gè)等待的操作。
  7. 當(dāng)on_write結(jié)束時(shí),我們有一個(gè)等待的操作(read)。
  8. 然后一直繼續(xù)循環(huán)下去,直到我們關(guān)閉這個(gè)應(yīng)用。

run_one(), poll(), poll_one() 方法

我在之前說過異步方法的handler是在調(diào)用了io_service::run的線程里被調(diào)用的。因?yàn)樵谥辽?0%~95%的時(shí)候,這是你唯一要用到的方法,所以我就把它說得簡單了。對于調(diào)用了run_one(), poll()或者poll_one()的線程這一點(diǎn)也是適用的。

run_one()方法最多執(zhí)行和分發(fā)一個(gè)異步操作:

  • 如果沒有等待的操作,方法立即返回0
  • 如果有等待操作,方法在第一個(gè)操作執(zhí)行之前處于阻塞狀態(tài),然后返回1

你可以認(rèn)為下面兩段代碼是等效的:

io_service service;
service.run(); // 或者
while ( !service.stopped()) service.run_once();

你可以使用run_once()啟動(dòng)一個(gè)異步操作,然后等待它執(zhí)行完成。

io_service service;
bool write_complete = false;
void on_write(const boost::system::error_code & err, size_t bytes)
{ write_complete = true; }
 …
std::string data = "login ok”;
write_complete = false;
async_write(sock, buffer(data), on_write);
do service.run_once() while (!write_complete);

還有一些使用run_one()方法的例子,包含在Boost.Asio諸如blocking_tcp_client.cppblocking_udp_client.cpp的文件中。

poll_one方法以非阻塞的方式最多運(yùn)行一個(gè)準(zhǔn)備好的等待操作:

  • 如果至少有一個(gè)等待的操作,而且準(zhǔn)備好以非阻塞的方式運(yùn)行,poll_one方法會(huì)運(yùn)行它并且返回1
  • 否則,方法立即返回0

操作正在等待并準(zhǔn)備以非阻塞方式運(yùn)行,通常意味著如下的情況:

  • 一個(gè)計(jì)時(shí)器過期了,然后它的async_wait處理方法需要被調(diào)用
  • 一個(gè)I/O操作完成了(比如async_read),然后它的hanlder需要被調(diào)用
  • 之前被加入io_services實(shí)例隊(duì)列中的自定義handler(這會(huì)在之后的章節(jié)中詳解)

你可以使用poll_one去保證所有I/O操作的handler完成運(yùn)行,同時(shí)做一些其他的工作

io_service service;
while ( true) {
    // 運(yùn)行所有完成了IO操作的handler
    while ( service.poll_one()) ;
    // ... 在這里做其他的事情 …
} 

poll()方法會(huì)以非阻塞的方式運(yùn)行所有等待的操作。下面兩段代碼是等效的:

io_service service;
service.poll(); // 或者
while ( service.poll_one()) ;

所有上述方法都會(huì)在失敗的時(shí)候拋出boost::system::system_error異常。這是我們所不希望發(fā)生的事情;這里拋出的異常通常都是致命的,也許是資源耗盡,或者是你handler的其中一個(gè)拋出了異常。另外,每個(gè)方法都有一個(gè)不拋出異常,而是返回一個(gè)boost::system::error_code的重載:

io_service service;
boost::system::error_code err = 0;
service.run(err);
if ( err) std::cout << "Error " << err << std::endl;

異步工作

異步工作不僅僅指用異步地方式接受客戶端到服務(wù)端的連接、異步地從一個(gè)socket讀取或者寫入到socket。它包含了所有可以異步執(zhí)行的操作。

默認(rèn)情況下,你是不知道每個(gè)異步handler的調(diào)用順序的。除了通常的異步調(diào)用(來自異步socket的讀取/寫入/接收)。你可以使用service.post()來使你的自定義方法被異步地調(diào)用。例如:

#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
io_service service;
void func(int i) {
    std::cout << "func called, i= " << i << std::endl;
}


void worker_thread() {
    service.run();
}


int main(int argc, char* argv[]) {
    for ( int i = 0; i < 10; ++i)
        service.post(boost::bind(func, i));
    boost::thread_group threads;
    for ( int i = 0; i < 3; ++i)
        threads.create_thread(worker_thread);
    // 等待所有線程被創(chuàng)建完
    boost::this_thread::sleep( boost::posix_time::millisec(500));
    threads.join_all();
}

在上面的例子中,service.post(some_function)添加了一個(gè)異步方法調(diào)用。

這個(gè)方法在某一個(gè)調(diào)用了service.run()的線程中請求io_service實(shí)例,然后調(diào)用給定的some_funtion之后立即返回。在我們的例子中,這個(gè)線程是我們之前創(chuàng)建的三個(gè)線程中的一個(gè)。你不能確定異步方法調(diào)用的順序。你不要期待它們會(huì)以我們調(diào)用post()方法的順序來調(diào)用。下面是運(yùn)行之前代碼可能得到的結(jié)果:

func called, i= 0
func called, i= 2
func called, i= 1
func called, i= 4
func called, i= 3
func called, i= 6
func called, i= 7
func called, i= 8
func called, i= 5
func called, i= 9

有時(shí)候你會(huì)想讓一些異步處理方法順序執(zhí)行。比如,你去一個(gè)餐館(go_to_restaurant),下單(order),然后吃(eat)。你需要先去餐館,然后下單,最后吃。這樣的話,你需要用到io_service::strand,這個(gè)方法會(huì)讓你的異步方法被順序調(diào)用??聪旅娴睦樱?/p>

using namespace boost::asio;
io_service service;
void func(int i) {
    std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl;
}
void worker_thread() {
    service.run();
}
int main(int argc, char* argv[])
{
    io_service::strand strand_one(service), strand_two(service);
    for ( int i = 0; i < 5; ++i)
        service.post( strand_one.wrap( boost::bind(func, i)));
    for ( int i = 5; i < 10; ++i)
        service.post( strand_two.wrap( boost::bind(func, i)));
    boost::thread_group threads;
    for ( int i = 0; i < 3; ++i)
        threads.create_thread(worker_thread);
    // 等待所有線程被創(chuàng)建完
    boost::this_thread::sleep( boost::posix_time::millisec(500));
    threads.join_all();
}

在上述代碼中,我們保證前面的5個(gè)線程和后面的5個(gè)線程是順序執(zhí)行的。func called, i = 0func called, i = 1之前被調(diào)用,然后調(diào)用func called, i = 2……同樣func called, i = 5func called, i = 6之前,func called, i = 6func called, i = 7被調(diào)用……你需要注意的是盡管方法是順序調(diào)用的,但是不意味著它們都在同一個(gè)線程執(zhí)行。運(yùn)行這個(gè)程序可能得到的一個(gè)結(jié)果如下:

func called, i= 0/002A60C8
func called, i= 5/002A6138
func called, i= 6/002A6530
func called, i= 1/002A6138
func called, i= 7/002A6530
func called, i= 2/002A6138
func called, i= 8/002A6530
func called, i= 3/002A6138
func called, i= 9/002A6530
func called, i= 4/002A6138

異步post() VS dispatch() VS wrap()

Boost.Asio提供了三種讓你把處理方法添加為異步調(diào)用的方式:

  • service.post(handler):這個(gè)方法能確保其在請求io_service實(shí)例,然后調(diào)用指定的處理方法之后立即返回。handler稍后會(huì)在某個(gè)調(diào)用了service.run()的線程中被調(diào)用。
  • service.dispatch(handler):這個(gè)方法請求io_service實(shí)例去調(diào)用給定的處理方法,但是另外一點(diǎn),如果當(dāng)前的線程調(diào)用了service.run(),它可以在方法中直接調(diào)用handler。
  • service.wrap(handler):這個(gè)方法創(chuàng)建了一個(gè)封裝方法,當(dāng)被調(diào)用時(shí)它會(huì)調(diào)用service.dispatch(handler),這個(gè)會(huì)讓人有點(diǎn)困惑,接下來我會(huì)簡單地解釋它是什么意思。

在之前的章節(jié)中你會(huì)看到關(guān)于service.post()的一個(gè)例子,以及運(yùn)行這個(gè)例子可能得到的一種結(jié)果。我們對它做一些修改,然后看看service.dispatch()是怎么影響輸出的結(jié)果的:

using namespace boost::asio;
io_service service;
void func(int i) {
    std::cout << "func called, i= " << i << std::endl;
}
void run_dispatch_and_post() {
    for ( int i = 0; i < 10; i += 2) {
    service.dispatch(boost::bind(func, i));
    service.post(boost::bind(func, i + 1));
    }
}
int main(int argc, char* argv[]) {
    service.post(run_dispatch_and_post);
    service.run();
}

在解釋發(fā)生了什么之前,我們先運(yùn)行程序,觀察結(jié)果:

func called, i= 0
func called, i= 2
func called, i= 4
func called, i= 6
func called, i= 8
func called, i= 1
func called, i= 3
func called, i= 5
func called, i= 7
func called, i= 9

偶數(shù)先輸出,然后是奇數(shù)。這是因?yàn)槲矣?em>dispatch()輸出偶數(shù),然后用post()輸出奇數(shù)。dispatch()會(huì)在返回之前調(diào)用hanlder,因?yàn)楫?dāng)前的線程調(diào)用了service.run(),而post()每次都立即返回了。

現(xiàn)在,讓我們講講service.wrap(handler)。wrap()返回了一個(gè)仿函數(shù),它可以用來做另外一個(gè)方法的參數(shù):

using namespace boost::asio;
io_service service;
void dispatched_func_1() {
    std::cout << "dispatched 1" << std::endl;
}
void dispatched_func_2() {
    std::cout << "dispatched 2" << std::endl;
}
void test(boost::function<void()> func) {
    std::cout << "test" << std::endl;
    service.dispatch(dispatched_func_1);
    func();
}
void service_run() {
    service.run();
}
int main(int argc, char* argv[]) {
    test( service.wrap(dispatched_func_2));
    boost::thread th(service_run);
    boost::this_thread::sleep( boost::posix_time::millisec(500));
    th.join();
}

test(service.wrap(dispatched_func_2));會(huì)把dispatched_ func_2包裝起來創(chuàng)建一個(gè)仿函數(shù),然后傳遞給test當(dāng)作一個(gè)參數(shù)。當(dāng)test()被調(diào)用時(shí),它會(huì)分發(fā)調(diào)用方法1,然后調(diào)用func()。這時(shí),你會(huì)發(fā)現(xiàn)調(diào)用func()service.dispatch(dispatched_func_2)是等價(jià)的,因?yàn)樗鼈兪沁B續(xù)調(diào)用的。程序的輸出證明了這一點(diǎn):

test
dispatched 1
dispatched 2

io_service::strand 類(用來序列化異步調(diào)用)也包含了poll(), dispatch()wrap()等成員函數(shù)。它們的作用和io_servicepoll(), dispatch()wrap()是一樣的。然而,大多數(shù)情況下你只需要把io_service::strand::wrap()方法做為io_service::poll()或者io_service::dispatch()方法的參數(shù)即可。

保持活動(dòng)

假設(shè)你需要做下面的操作:

io_service service;
ip::tcp::socket sock(service);
char buff[512];
...
read(sock, buffer(buff));

在這個(gè)例子中,sockbuff的存在時(shí)間都必須比read()調(diào)用的時(shí)間要長。也就是說,在調(diào)用read()返回之前,它們都必須有效。這就是你所期望的;你傳給一個(gè)方法的所有參數(shù)在方法內(nèi)部都必須有效。當(dāng)我們采用異步方式時(shí),事情會(huì)變得比較復(fù)雜。

io_service service;
ip::tcp::socket sock(service);
char buff[512];
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), on_read);

在這個(gè)例子中,sockbuff的存在時(shí)間都必須比read()操作本身時(shí)間要長,但是read操作持續(xù)的時(shí)間我們是不知道的,因?yàn)樗钱惒降摹?/p>

當(dāng)使用socket緩沖區(qū)的時(shí)候,你會(huì)有一個(gè)buffer實(shí)例在異步調(diào)用時(shí)一直存在(使用boost::shared_array<>)。在這里,我們可以使用同樣的方式,通過創(chuàng)建一個(gè)類并在其內(nèi)部管理socket和它的讀寫緩沖區(qū)。然后,對于所有的異步操作,傳遞一個(gè)包含智能指針的boost::bind仿函數(shù)給它:

using namespace boost::asio;
io_service service;
struct connection : boost::enable_shared_from_this<connection> {
    typedef boost::system::error_code error_code;
    typedef boost::shared_ptr<connection> ptr;
    connection() : sock_(service), started_(true) {}
    void start(ip::tcp::endpoint ep) {
        sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1));
    }
    void stop() {
        if ( !started_) return;
        started_ = false;
        sock_.close();
    }
    bool started() { return started_; }
private:
    void on_connect(const error_code & err) {
        // 這里你決定用這個(gè)連接做什么: 讀取或者寫入
        if ( !err) do_read();
        else stop();
    }
    void on_read(const error_code & err, size_t bytes) {
        if ( !started() ) return;
        std::string msg(read_buffer_, bytes);
        if ( msg == "can_login") do_write("access_data");
        else if ( msg.find("data ") == 0) process_data(msg);
        else if ( msg == "login_fail") stop();
    }
    void on_write(const error_code & err, size_t bytes) {
        do_read(); 
    }
    void do_read() {
        sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(),   _1, _2)); 
    }
    void do_write(const std::string & msg) {
        if ( !started() ) return;
        // 注意: 因?yàn)樵谧隽硗庖粋€(gè)async_read操作之前你想要發(fā)送多個(gè)消息, 
        // 所以你需要多個(gè)寫入buffer
        std::copy(msg.begin(), msg.end(), write_buffer_);
        sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); 
    }


    void process_data(const std::string & msg) {
        // 處理服務(wù)端來的內(nèi)容,然后啟動(dòng)另外一個(gè)寫入操作
    }
private:
    ip::tcp::socket sock_;
    enum { max_msg = 1024 };
    char read_buffer_[max_msg];
    char write_buffer_[max_msg];
    bool started_;
};


int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
    connection::ptr(new connection)->start(ep);
} 

在所有異步調(diào)用中,我們傳遞一個(gè)boost::bind仿函數(shù)當(dāng)作參數(shù)。這個(gè)仿函數(shù)內(nèi)部包含了一個(gè)智能指針,指向connection實(shí)例。只要有一個(gè)異步操作等待時(shí),Boost.Asio就會(huì)保存boost::bind仿函數(shù)的拷貝,這個(gè)拷貝保存了指向連接實(shí)例的一個(gè)智能指針,從而保證connection實(shí)例保持活動(dòng)。問題解決!

當(dāng)然,connection類僅僅是一個(gè)框架類;你需要根據(jù)你的需求對它進(jìn)行調(diào)整(它看起來會(huì)和當(dāng)前服務(wù)端例子的情況相當(dāng)不同)。

你需要注意的是創(chuàng)建一個(gè)新的連接是相當(dāng)簡單的:connection::ptr(new connection)- >start(ep)。這個(gè)方法啟動(dòng)了到服務(wù)端的(異步)連接。當(dāng)你需要關(guān)閉這個(gè)連接時(shí),調(diào)用stop()。

當(dāng)實(shí)例被啟動(dòng)時(shí)(start()),它會(huì)等待客戶端的連接。當(dāng)連接發(fā)生時(shí)。on_connect()被調(diào)用。如果沒有錯(cuò)誤發(fā)生,它啟動(dòng)一個(gè)read操作(do_read())。當(dāng)read操作結(jié)束時(shí),你就可以解析這個(gè)消息;當(dāng)然你應(yīng)用的on_read()看起來會(huì)各種各樣。而當(dāng)你寫回一個(gè)消息時(shí),你需要把它拷貝到緩沖區(qū),然后像我在do_write()方法中所做的一樣將其發(fā)送出去,因?yàn)檫@個(gè)緩沖區(qū)同樣需要在這個(gè)異步寫操作中一直存活。最后需要注意的一點(diǎn)——當(dāng)寫回時(shí),你需要指定寫入的數(shù)量,否則,整個(gè)緩沖區(qū)都會(huì)被發(fā)送出去。

總結(jié)

網(wǎng)絡(luò)api實(shí)際上要繁雜得多,這個(gè)章節(jié)只是做為一個(gè)參考,當(dāng)你在實(shí)現(xiàn)自己的網(wǎng)絡(luò)應(yīng)用時(shí)可以回過頭來看看。

Boost.Asio實(shí)現(xiàn)了端點(diǎn)的概念,你可以認(rèn)為是IP和端口。如果你不知道準(zhǔn)確的IP,你可以使用resolver對象將主機(jī)名,例如www.yahoo.com轉(zhuǎn)換為一個(gè)或多個(gè)IP地址。

我們也可以看到API的核心——socket類。Boost.Asio提供了TCP、UDPICMP的實(shí)現(xiàn)。而且你還可以用你自己的協(xié)議來對它進(jìn)行擴(kuò)展;當(dāng)然,這個(gè)工作不適合缺乏勇氣的人。

異步編程是剛需。你應(yīng)該已經(jīng)明白為什么有時(shí)候需要用到它,尤其在寫服務(wù)端的時(shí)候。調(diào)用service.run()來實(shí)現(xiàn)異步循環(huán)就已經(jīng)可以讓你很滿足,但是有時(shí)候你需要更進(jìn)一步,嘗試使用run_one()、poll()或者poll_one()。

當(dāng)實(shí)現(xiàn)異步時(shí),你可以異步執(zhí)行你自己的方法;使用service.post()或者service.dispatch()

最后,為了使socket和緩沖區(qū)(read或者write)在整個(gè)異步操作的生命周期中一直活動(dòng),我們需要采取特殊的防護(hù)措施。你的連接類需要繼承自enabled_shared_from_this,然后在內(nèi)部保存它需要的緩沖區(qū),而且每次異步調(diào)用都要傳遞一個(gè)智能指針給this操作。

下一章會(huì)進(jìn)行實(shí)戰(zhàn)操作;在實(shí)現(xiàn)回顯客戶端/服務(wù)端應(yīng)用時(shí)會(huì)有大量的編程實(shí)踐。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)