在這一章節(jié),我們會(huì)深入學(xué)習(xí)怎樣使用Boost.Asio建立非凡的客戶(hù)端和服務(wù)端應(yīng)用。你可以運(yùn)行并測(cè)試它們,而且在理解之后,你可以把它們做為框架來(lái)構(gòu)造自己的應(yīng)用。
在接下來(lái)的例子中:
客戶(hù)端可以發(fā)送如下請(qǐng)求:
為了更有趣一點(diǎn),我們?cè)黾恿艘恍╇y度:
首先,我們會(huì)實(shí)現(xiàn)同步應(yīng)用。你會(huì)發(fā)現(xiàn)它的代碼很直接而且易讀的。而且因?yàn)樗械木W(wǎng)絡(luò)調(diào)用都是阻塞的,所以它不需要獨(dú)立的線程。
同步客戶(hù)端會(huì)以你所期望的串行方式運(yùn)行;連接到服務(wù)端,登錄服務(wù)器,然后執(zhí)行連接循環(huán),比如休眠一下,發(fā)起一個(gè)請(qǐng)求,讀取服務(wù)端返回,然后再休眠一會(huì),然后一直循環(huán)下去……
因?yàn)槲覀兪峭降?,所以我們讓事情變得?jiǎn)單一點(diǎn)。首先,連接到服務(wù)器,然后再循環(huán),如下:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void run_client(const std::string & client_name) {
talk_to_svr client(client_name);
try {
client.connect(ep);
client.loop();
} catch(boost::system::system_error & err) {
std::cout << "client terminated " << std::endl;
}
}
下面的代碼片段展示了talk_to_svr類(lèi):
struct talk_to_svr {
talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username) {}
void connect(ip::tcp::endpoint ep) {
sock_.connect(ep);
}
void loop() {
write("login " + username_ + "\n");
read_answer();
while ( started_) {
write_request();
read_answer();
boost::this_thread::sleep(millisec(rand() % 7000));
}
}
std::string username() const { return username_; }
...
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
int already_read_;
char buff_[max_msg];
bool started_;
std::string username_;
};
在這個(gè)循環(huán)中,我們僅僅填充1個(gè)比特,做一個(gè)ping操作之后就進(jìn)入睡眠狀態(tài),之后再讀取服務(wù)端的返回。我們的睡眠是隨機(jī)的(有時(shí)候超過(guò)5秒),這樣服務(wù)端就有可能在某個(gè)時(shí)間點(diǎn)斷開(kāi)我們的連接:
void write_request() {
write("ping\n");
}
void read_answer() {
already_read_ = 0;
read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2));
process_msg();
}
void process_msg() {
std::string msg(buff_, already_read_);
if ( msg.find("login ") == 0) on_login();
else if ( msg.find("ping") == 0) on_ping(msg);
else if ( msg.find("clients ") == 0) on_clients(msg);
else std::cerr << "invalid msg " << msg << std::endl;
}
對(duì)于讀取結(jié)果,我們使用在之前章節(jié)就有說(shuō)到的read_complete來(lái)保證我們能讀到換行符(’\n’)。這段邏輯在process_msg()中,在這里我們讀取服務(wù)端的返回,然后分發(fā)到正確的方法去處理:
void on_login() { do_ask_clients(); }
void on_ping(const std::string & msg) {
std::istringstream in(msg);
std::string answer;
in >> answer >> answer;
if ( answer == "client_list_changed")
do_ask_clients();
}
void on_clients(const std::string & msg) {
std::string clients = msg.substr(8);
std::cout << username_ << ", new client list:" << clients;
}
void do_ask_clients() {
write("ask_clients\n");
read_answer();
}
void write(const std::string & msg) { sock_.write_some(buffer(msg)); }
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// ... 和之前一樣
}
在讀取服務(wù)端對(duì)我們ping操作的返回時(shí),如果得到的消息是client_list_changed,我們就需要重新請(qǐng)求客戶(hù)端列表。
同步服務(wù)端也是相當(dāng)簡(jiǎn)單的。它只需要兩個(gè)線程,一個(gè)負(fù)責(zé)接收新的客戶(hù)端連接,另外一個(gè)負(fù)責(zé)處理已經(jīng)存在的客戶(hù)端請(qǐng)求。它不能使用單線程,因?yàn)榈却碌目蛻?hù)端連接是一個(gè)阻塞操作,所以我們需要另外一個(gè)線程來(lái)處理已經(jīng)存在的客戶(hù)端請(qǐng)求。
正常來(lái)說(shuō)服務(wù)端都比客戶(hù)端要難實(shí)現(xiàn)。一方面,它要管理所有已經(jīng)連接的客戶(hù)端。因?yàn)槲覀兪峭降?,所以我們需要至少兩個(gè)線程,一個(gè)負(fù)責(zé)接受新的客戶(hù)端連接(因?yàn)閍ccept()是阻塞的)而另一個(gè)負(fù)責(zé)回復(fù)已經(jīng)存在的客戶(hù)端。
void accept_thread() {
ip::tcp::acceptor acceptor(service,ip::tcp::endpoint(ip::tcp::v4(), 8001));
while ( true) {
client_ptr new_( new talk_to_client);
acceptor.accept(new_->sock());
boost::recursive_mutex::scoped_lock lk(cs);
clients.push_back(new_);
}
}
void handle_clients_thread() {
while ( true) {
boost::this_thread::sleep( millisec(1));
boost::recursive_mutex::scoped_lock lk(cs);
for(array::iterator b = clients.begin(), e = clients.end(); b!= e; ++b)
(*b)->answer_to_client();
// 刪除已經(jīng)超時(shí)的客戶(hù)端
clients.erase(std::remove_if(clients.begin(), clients.end(), boost::bind(&talk_to_client::timed_out,_1)), clients.end());
}
}
int main(int argc, char* argv[]) {
boost::thread_group threads;
threads.create_thread(accept_thread);
threads.create_thread(handle_clients_thread);
threads.join_all();
}
為了分辨客戶(hù)端發(fā)送過(guò)來(lái)的請(qǐng)求我們需要保存一個(gè)客戶(hù)端的列表。
每個(gè)talk_to_client實(shí)例都擁有一個(gè)socket,socket類(lèi)是不支持拷貝構(gòu)造的,所以如果你想要把它們保存在一個(gè)std::vector對(duì)象中,你需要一個(gè)指向它的智能指針。這里有兩種實(shí)現(xiàn)的方式:在talk_to_client內(nèi)部保存一個(gè)指向socket的智能指針然后創(chuàng)建一個(gè)talk_to_client實(shí)例的數(shù)組,或者讓talk_to_client實(shí)例用變量的方式保存socket,然后創(chuàng)建一個(gè)指向talk_to_client智能指針的數(shù)組。我選擇后者,但是你也可以選前面的方式:
typedef boost::shared_ptr<talk_to_client> client_ptr;
typedef std::vector<client_ptr> array;
array clients;
boost::recursive_mutex cs; // 用線程安全的方式訪問(wèn)客戶(hù)端數(shù)組
talk_to_client的主要代碼如下:
struct talk_to_client : boost::enable_shared_from_this<talk_to_client>
{
talk_to_client() { ... }
std::string username() const { return username_; }
void answer_to_client() {
try {
read_request();
process_request();
} catch ( boost::system::system_error&) { stop(); }
if ( timed_out())
stop();
}
void set_clients_changed() { clients_changed_ = true; }
ip::tcp::socket & sock() { return sock_; }
bool timed_out() const {
ptime now = microsec_clock::local_time();
long long ms = (now - last_ping).total_milliseconds();
return ms > 5000 ;
}
void stop() {
boost::system::error_code err; sock_.close(err);
}
void read_request() {
if ( sock_.available())
already_read_ += sock_.read_some(buffer(buff_ + already_read_, max_msg - already_read_));
}
...
private:
// ... 和同步客戶(hù)端中的一樣
bool clients_changed_;
ptime last_ping;
};
上述代碼擁有非常好的自釋能力。其中最重要的方法是read_request()。它只在存在有效數(shù)據(jù)的情況才讀取,這樣的話,服務(wù)端永遠(yuǎn)都不會(huì)阻塞:
void process_request() {
bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_;
if ( !found_enter)
return; // 消息不完整
// 處理消息
last_ping = microsec_clock::local_time();
size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_;
std::string msg(buff_, pos);
std::copy(buff_ + already_read_, buff_ + max_msg, buff_);
already_read_ -= pos + 1;
if ( msg.find("login ") == 0) on_login(msg);
else if ( msg.find("ping") == 0) on_ping();
else if ( msg.find("ask_clients") == 0) on_clients();
else std::cerr << "invalid msg " << msg << std::endl;
}
void on_login(const std::string & msg) {
std::istringstream in(msg);
in >> username_ >> username_;
write("login ok\n");
update_clients_changed();
}
void on_ping() {
write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
clients_changed_ = false;
}
void on_clients() {
std::string msg;
{ boost::recursive_mutex::scoped_lock lk(cs);
for( array::const_iterator b = clients.begin(), e = clients.end() ; b != e; ++b)
msg += (*b)->username() + " ";
}
write("clients " + msg + "\n");
}
void write(const std::string & msg){sock_.write_some(buffer(msg)); }
觀察process_request()。當(dāng)我們讀取到足夠多有效的數(shù)據(jù)時(shí),我們需要知道我們是否已經(jīng)讀取到整個(gè)消息(如果found_enter為真)。這樣做的話,我們可以使我們避免一次讀多個(gè)消息的可能(’\n’之后的消息也被保存到緩沖區(qū)中),然后我們解析讀取到的整個(gè)消息。剩下的代碼都是很容易讀懂的。
現(xiàn)在,是比較有趣(也比較難)的異步實(shí)現(xiàn)! 當(dāng)查看示意圖時(shí),你需要知道Boost.Asio代表由Boost.Asio執(zhí)行的一個(gè)異步調(diào)用。例如do_read(),Boost.Asio和on_read()代表了從do_read()到on_read()的邏輯流程,但是你永遠(yuǎn)不知道什么時(shí)候輪到on_read()被調(diào)用,你只是知道你最終會(huì)調(diào)用它。
到這里事情會(huì)變得有點(diǎn)復(fù)雜,但是仍然是可控的。當(dāng)然你也會(huì)擁有一個(gè)不會(huì)阻塞的應(yīng)用。
下面的代碼你應(yīng)該已經(jīng)很熟悉:
#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_
this(),y)
#define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_
this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, boost::noncopyable {
typedef talk_to_svr self_type;
talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username), timer_
(service) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, MEM_FN1(on_connect,_1));
}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_svr> ptr;
static ptr start(ip::tcp::endpoint ep, const std::string & username) {
ptr new_(new talk_to_svr(username));
new_->start(ep);
return new_;
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
...
private:
size_t read_complete(const boost::system::error_code &err, size_t bytes) {
if ( err) return 0;
bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes;
return found ? 0 : 1;
}
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
std::string username_;
deadline_timer timer_;
};
你會(huì)看到額外還有一個(gè)叫deadlinetimer timer的方法用來(lái)ping服務(wù)端;而且ping操作同樣是隨機(jī)的。
下面是類(lèi)的邏輯:
void on_connect(const error_code & err) {
if ( !err) do_write("login " + username_ + "\n");
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( err) stop();
if ( !started() ) return;
// 處理消息
std::string msg(read_buffer_, bytes);
if ( msg.find("login ") == 0) on_login();
else if ( msg.find("ping") == 0) on_ping(msg);
else if ( msg.find("clients ") == 0) on_clients(msg);
}
void on_login() {
do_ask_clients();
}
void on_ping(const std::string & msg) {
std::istringstream in(msg);
std::string answer;
in >> answer >> answer;
if ( answer == "client_list_changed") do_ask_clients();
else postpone_ping();
}
void on_clients(const std::string & msg) {
std::string clients = msg.substr(8);
std::cout << username_ << ", new client list:" << clients ;
postpone_ping();
}
在on_read()中,首先的兩行代碼是亮點(diǎn)。在第一行,如果出現(xiàn)錯(cuò)誤,我們就停止。而第二行,如果我們已經(jīng)停止了(之前就停止了或者剛好停止),我們就返回。反之如果所有都是OK,我們就對(duì)收到的消息進(jìn)行處理。
最后是*do_**方法,實(shí)現(xiàn)如下:
void do_ping() { do_write("ping\n"); }
void postpone_ping() {
timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000));
timer_.async_wait( MEM_FN(do_ping));
}
void do_ask_clients() { do_write("ask_clients\n"); }
void on_write(const error_code & err, size_t bytes) { do_read(); }
void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
注意每一個(gè)read操作都會(huì)觸發(fā)一個(gè)ping操作
這個(gè)示意圖是相當(dāng)復(fù)雜的;從Boost.Asio出來(lái)你可以看到4個(gè)箭頭指向on_accept,on_read,on_write和on_check_ping。這也就意味著你永遠(yuǎn)不知道哪個(gè)異步調(diào)用是下一個(gè)完成的調(diào)用,但是你可以確定的是它是這4個(gè)操作中的一個(gè)。
現(xiàn)在,我們是異步的了;我們可以繼續(xù)保持單線程。接受客戶(hù)端連接是最簡(jiǎn)單的部分,如下所示:
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
client->start();
talk_to_client::ptr new_client = talk_to_client::new_();
acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1));
}
int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_();
acceptor.async_accept(client->sock(),boost::bind(handle_accept,client,_1));
service.run();
}
上述代碼會(huì)一直異步地等待一個(gè)新的客戶(hù)端連接(每個(gè)新的客戶(hù)端連接會(huì)觸發(fā)另外一個(gè)異步等待操作)。
我們需要監(jiān)控client list changed事件(一個(gè)新客戶(hù)端連接或者一個(gè)客戶(hù)端斷開(kāi)連接),然后當(dāng)事件發(fā)生時(shí)通知所有的客戶(hù)端。因此,我們需要保存一個(gè)客戶(hù)端連接的數(shù)組,否則除非你不需要在某一時(shí)刻知道所有連接的客戶(hù)端,你才不需要這樣一個(gè)數(shù)組。
class talk_to_client;
typedef boost::shared_ptr<talk_to_client>client_ptr;
typedef std::vector<client_ptr> array;
array clients;
connection類(lèi)的框架如下:
class talk_to_client : public boost::enable_shared_from_this<talk_to_client> , boost::noncopyable {
talk_to_client() { ... }
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_client> ptr;
void start() {
started_ = true;
clients.push_back( shared_from_this());
last_ping = boost::posix_time::microsec_clock::local_time();
do_read(); //首先,我們等待客戶(hù)端連接
}
static ptr new_() { ptr new_(new talk_to_client); return new_; }
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
ptr self = shared_from_this();
array::iterator it = std::find(clients.begin(), clients.end(), self);
clients.erase(it);
update_clients_changed();
}
bool started() const { return started_; }
ip::tcp::socket & sock() { return sock_;}
std::string username() const { return username_; }
void set_clients_changed() { clients_changed_ = true; }
…
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
std::string username_;
deadline_timer timer_;
boost::posix_time::ptime last_ping;
bool clients_changed_;
};
我會(huì)用talk_to_client或者talk_to_server來(lái)調(diào)用connection類(lèi),從而讓你更明白我所說(shuō)的內(nèi)容。
現(xiàn)在你需要用到之前的代碼了;它和我們?cè)诳蛻?hù)端應(yīng)用中所用到的是一樣的。我們還有另外一個(gè)stop()方法,這個(gè)方法用來(lái)從客戶(hù)端數(shù)組中移除一個(gè)客戶(hù)端連接。
服務(wù)端持續(xù)不斷地等待異步的read操作:
void on_read(const error_code & err, size_t bytes) {
if ( err) stop();
if ( !started() ) return;
std::string msg(read_buffer_, bytes);
if ( msg.find("login ") == 0) on_login(msg);
else if ( msg.find("ping") == 0) on_ping();
else if ( msg.find("ask_clients") == 0) on_clients();
}
void on_login(const std::string & msg) {
std::istringstream in(msg);
in >> username_ >> username_;
do_write("login ok\n");
update_clients_changed();
}
void on_ping() {
do_write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
clients_changed_ = false;
}
void on_clients() {
std::string msg;
for(array::const_iterator b =clients.begin(),e =clients.end(); b != e; ++b)
msg += (*b)->username() + " ";
do_write("clients " + msg + "\n");
}
這段代碼是簡(jiǎn)單易懂的;需要注意的一點(diǎn)是:當(dāng)一個(gè)新客戶(hù)端登錄,我們調(diào)用update_clients_changed(),這個(gè)方法為所有客戶(hù)端將clientschanged標(biāo)志為true。
服務(wù)端每收到一個(gè)請(qǐng)求就用相應(yīng)的方式進(jìn)行回復(fù),如下所示:
void do_ping() { do_write("ping\n"); }
void do_ask_clients() { do_write("ask_clients\n"); }
void on_write(const error_code & err, size_t bytes) { do_read(); }
void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
post_check_ping();
}
void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// ... 就像之前
}
在每個(gè)write操作的末尾,on_write()方法被調(diào)用,這個(gè)方法會(huì)觸發(fā)另外一個(gè)異步讀操作,這樣的話“等待請(qǐng)求-回復(fù)請(qǐng)求”這個(gè)循環(huán)就會(huì)一直執(zhí)行,直到客戶(hù)端斷開(kāi)連接或者超時(shí)。
在每次讀操作開(kāi)始之前,我們異步等待5秒鐘來(lái)觀察客戶(hù)端是否超時(shí)。如果超時(shí),我們關(guān)閉它的連接:
void on_check_ping() {
ptime now = microsec_clock::local_time();
if ( (now - last_ping).total_milliseconds() > 5000)
stop();
last_ping = boost::posix_time::microsec_clock::local_time();
}
void post_check_ping() {
timer_.expires_from_now(boost::posix_time::millisec(5000));
timer_.async_wait( MEM_FN(on_check_ping));
}
這就是整個(gè)服務(wù)端的實(shí)現(xiàn)。你可以運(yùn)行并讓它工作起來(lái)!
在代碼中,我向你們展示了這一章我們學(xué)到的東西,為了更容易理解,我把代碼稍微精簡(jiǎn)了下;比如,大部分的控制臺(tái)輸出我都沒(méi)有展示,盡管在這本書(shū)附贈(zèng)的代碼中它們是存在的。我建議你自己運(yùn)行這些例子,因?yàn)閺念^到尾讀一次代碼能加強(qiáng)你對(duì)本章展示應(yīng)用的理解。
我們已經(jīng)學(xué)到了怎么寫(xiě)一些基礎(chǔ)的客戶(hù)端/服務(wù)端應(yīng)用。我們已經(jīng)避免了一些諸如內(nèi)存泄漏和死鎖的低級(jí)錯(cuò)誤。所有的編碼都是框架式的,這樣你就可以根據(jù)你自己的需求對(duì)它們進(jìn)行擴(kuò)展。
在接下來(lái)的章節(jié)中,我們會(huì)更加深入地了解使用Boost.Asio進(jìn)行同步編程和異步編程的不同點(diǎn),同時(shí)你也會(huì)學(xué)會(huì)如何嵌入你自己的異步操作。
更多建議: