[譯]回顯服務(wù)端/客戶端

2018-06-19 15:34 更新

回顯服務(wù)端/客戶端

在這一章,我們將會(huì)實(shí)現(xiàn)一個(gè)小的客戶端/服務(wù)端應(yīng)用,這可能會(huì)是你寫過的最簡(jiǎn)單的客戶端/服務(wù)端應(yīng)用?;仫@應(yīng)用就是一個(gè)把客戶端發(fā)過來的任何內(nèi)容回顯給其本身,然后關(guān)閉連接的的服務(wù)端。這個(gè)服務(wù)端可以處理任何數(shù)量的客戶端。每個(gè)客戶端連接之后發(fā)送一個(gè)消息,服務(wù)端接收到完成消息后把它發(fā)送回去。在那之后,服務(wù)端關(guān)閉連接。

因此,每個(gè)回顯客戶端連接到服務(wù)端,發(fā)送一個(gè)消息,然后讀取服務(wù)端返回的結(jié)果,確保這是它發(fā)送給服務(wù)端的消息就結(jié)束和服務(wù)端的會(huì)話。

我們首先實(shí)現(xiàn)一個(gè)同步應(yīng)用,然后實(shí)現(xiàn)一個(gè)異步應(yīng)用,以便你可以很容易對(duì)比他們:

這里寫圖片描述

為了節(jié)省空間,下面的代碼有一些被裁剪掉了。你可以在附加在這本書的代碼中看到全部的代碼。

TCP回顯服務(wù)端/客戶端

對(duì)于TCP而言,我們需要一個(gè)額外的保證;每一個(gè)消息以換行符結(jié)束(‘\n’)。編寫一個(gè)同步回顯服務(wù)端/客戶端非常簡(jiǎn)單。

我們會(huì)展示編碼內(nèi)容,比如同步客戶端,同步服務(wù)端,異步客戶端和異步服務(wù)端。

TCP同步客戶端

在大多數(shù)有價(jià)值的例子中,客戶端通常比服務(wù)端編碼要簡(jiǎn)單(因?yàn)榉?wù)端需要處理多個(gè)客戶端請(qǐng)求)。

下面的代碼展示了不符合這條規(guī)則的一個(gè)例外:

  1. size_t read_complete(char * buf, const error_code & err, size_t bytes)
  2. {
  3. if ( err) return 0;
  4. bool found = std::find(buf, buf + bytes, '\n') < buf + bytes;
  5. // 我們一個(gè)一個(gè)讀取直到讀到回車,不緩存
  6. return found ? 0 : 1;
  7. }
  8. void sync_echo(std::string msg) {
  9. msg += "\n”;
  10. ip::tcp::socket sock(service);
  11. sock.connect(ep);
  12. sock.write_some(buffer(msg));
  13. char buf[1024];
  14. int bytes = read(sock, buffer(buf), boost::bind(read_complete,buf,_1,_2));
  15. std::string copy(buf, bytes - 1);
  16. msg = msg.substr(0, msg.size() - 1);
  17. std::cout << "server echoed our " << msg << ": "<< (copy == msg ? "OK" : "FAIL") << std::endl;
  18. sock.close();
  19. }
  20. int main(int argc, char* argv[]) {
  21. char* messages[] = { "John says hi", "so does James", "Lucy just got home", "Boost.Asio is Fun!", 0 };
  22. boost::thread_group threads;
  23. for ( char ** message = messages; *message; ++message) {
  24. threads.create_thread( boost::bind(sync_echo, *message));
  25. boost::this_thread::sleep( boost::posix_time::millisec(100));
  26. }
  27. threads.join_all();
  28. }

核心功能sync_echo。它包含了連接到服務(wù)端,發(fā)送信息然后等待回顯的所有邏輯。

你會(huì)發(fā)現(xiàn),在讀取時(shí),我使用了自由函數(shù)read(),因?yàn)槲蚁胍x’\n’之前的所有內(nèi)容。sock.read_some()方法滿足不了這個(gè)要求,因?yàn)樗粫?huì)讀可用的,而不是全部的消息。

read()方法的第三個(gè)參數(shù)是完成處理句柄。當(dāng)讀取到完整消息時(shí),它返回0。否則,它會(huì)返回我下一步(直到讀取結(jié)束)能都到的最大的緩沖區(qū)大小。在我們的例子中,返回結(jié)果始終是1,因?yàn)槲矣肋h(yuǎn)不想讀的消息比我們需要的更多。

main()中,我們創(chuàng)建了幾個(gè)線程;每個(gè)線程負(fù)責(zé)把消息發(fā)送到客戶端,然后等待操作結(jié)束。如果你運(yùn)行這個(gè)程序,你會(huì)看到下面的輸出:

  1. server echoed our John says hi: OK
  2. server echoed our so does James: OK
  3. server echoed our Lucy just got home: OK
  4. server echoed our Boost.Asio is Fun!: OK

注意:因?yàn)槲覀兪峭降?,所以不需要調(diào)用service.run()。

TCP同步服務(wù)端

回顯同步服務(wù)端的編寫非常容易,參考如下的代碼片段:

  1. io_service service;
  2. size_t read_complete(char * buff, const error_code & err, size_t bytes) {
  3. if ( err) return 0;
  4. bool found = std::find(buff, buff + bytes, '\n') < buff + bytes;
  5. // 我們一個(gè)一個(gè)讀取直到讀到回車,不緩存
  6. return found ? 0 : 1;
  7. }
  8. void handle_connections() {
  9. ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
  10. char buff[1024];
  11. while ( true) {
  12. ip::tcp::socket sock(service);
  13. acceptor.accept(sock);
  14. int bytes = read(sock, buffer(buff), boost::bind(read_complete,buff,_1,_2));
  15. std::string msg(buff, bytes);
  16. sock.write_some(buffer(msg));
  17. sock.close();
  18. }
  19. }
  20. int main(int argc, char* argv[]) {
  21. handle_connections();
  22. }

服務(wù)端的邏輯主要在handle_connections()。因?yàn)槭菃尉€程,我們接受一個(gè)客戶端請(qǐng)求,讀取它發(fā)送給我們的消息,然后回顯,然后等待下一個(gè)連接??梢源_定,當(dāng)兩個(gè)客戶端同時(shí)連接時(shí),第二個(gè)客戶端需要等待服務(wù)端處理完第一個(gè)客戶端的請(qǐng)求。

還是要注意因?yàn)槲覀兪峭降?,所以不需要調(diào)用service.run()。

TCP異步客戶端

當(dāng)我們開始異步時(shí),編碼會(huì)變得稍微有點(diǎn)復(fù)雜。我們會(huì)構(gòu)建在第二章 保持活動(dòng)中展示的connection類。

觀察這個(gè)章節(jié)中接下來的代碼,你會(huì)發(fā)現(xiàn)每個(gè)異步操作啟動(dòng)了新的異步操作,以保持service.run()一直工作。

首先,核心功能如下:

  1. #define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
  2. #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y)
  3. #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z)
  4. class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , boost::noncopyable {
  5. typedef talk_to_svr self_type;
  6. talk_to_svr(const std::string & message) : sock_(service), started_(true), message_(message) {}
  7. void start(ip::tcp::endpoint ep) {
  8. sock_.async_connect(ep, MEM_FN1(on_connect,_1));
  9. }
  10. public:
  11. typedef boost::system::error_code error_code;
  12. typedef boost::shared_ptr<talk_to_svr> ptr;
  13. static ptr start(ip::tcp::endpoint ep, const std::string &message) {
  14. ptr new_(new talk_to_svr(message));
  15. new_->start(ep);
  16. return new_;
  17. }
  18. void stop() {
  19. if ( !started_) return;
  20. started_ = false;
  21. sock_.close();
  22. }
  23. bool started() { return started_; }
  24. ...
  25. private:
  26. ip::tcp::socket sock_;
  27. enum { max_msg = 1024 };
  28. char read_buffer_[max_msg];
  29. char write_buffer_[max_msg];
  30. bool started_;
  31. std::string message_;
  32. };

我們需要一直使用指向talk_to_svr的智能指針,這樣的話當(dāng)在tack_to_svr的實(shí)例上有異步操作時(shí),那個(gè)實(shí)例是一直活動(dòng)的。為了避免錯(cuò)誤,比如在棧上構(gòu)建一個(gè)talk_to_svr對(duì)象的實(shí)例時(shí),我把構(gòu)造方法設(shè)置成了私有而且不允許拷貝構(gòu)造(繼承自boost::noncopyable)。

我們有了核心方法,比如start(),stop()started(),它們所做的事情也正如它們名字表達(dá)的一樣。如果需要建立連接,調(diào)用talk_to_svr::start(endpoint, message)即可。我們同時(shí)還有一個(gè)read緩沖區(qū)和一個(gè)write緩沖區(qū)。(readbuuferwritebuffer)。

MEM_FN 是一個(gè)方便使用的宏,它們通過shared_ptr_from_this()方法強(qiáng)制使用一個(gè)指向 this 的智能指針。

下面的幾行代碼和之前的解釋非常不同:

  1. //等同于 "sock_.async_connect(ep, MEM_FN1(on_connect,_1));"
  2. sock_.async_connect(ep,boost::bind(&talk_to_svr::on_connect,shared_ptr_from_this(),_1));
  3. sock_.async_connect(ep, boost::bind(&talk_to_svr::on_connect,this,_1));

在上述例子中,我們正確的創(chuàng)建了async_connect的完成處理句柄;在調(diào)用完成處理句柄之前它會(huì)保留一個(gè)指向talk_to_server實(shí)例的智能指針,從而保證當(dāng)其發(fā)生時(shí)talk_to_server實(shí)例還是保持活動(dòng)的。

在接下來的例子中,我們錯(cuò)誤地創(chuàng)建了完成處理句柄,當(dāng)它被調(diào)用時(shí),talk_to_server實(shí)例很可能已經(jīng)被釋放了。

從socket讀取或?qū)懭霑r(shí),你使用如下的代碼片段:

  1. void do_read() {
  2. async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
  3. }
  4. void do_write(const std::string & msg) {
  5. if ( !started() ) return;
  6. std::copy(msg.begin(), msg.end(), write_buffer_);
  7. sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
  8. }
  9. size_t read_complete(const boost::system::error_code & err, size_t bytes) {
  10. // 和TCP客戶端中的類似
  11. }

do_read()方法會(huì)保證當(dāng)on_read()被調(diào)用的時(shí)候,我們從服務(wù)端讀取一行。do_write()方法會(huì)先把信息拷貝到緩沖區(qū)(考慮到當(dāng)async_write發(fā)生時(shí)msg可能已經(jīng)超出范圍被釋放),然后保證實(shí)際的寫入操作發(fā)生時(shí)on_write()被調(diào)用。

然后是最重要的方法,這個(gè)方法包含了類的主要邏輯:

  1. void on_connect(const error_code & err) {
  2. if ( !err) do_write(message_ + "\n");
  3. else stop();
  4. }
  5. void on_read(const error_code & err, size_t bytes) {
  6. if ( !err) {
  7. std::string copy(read_buffer_, bytes - 1);
  8. std::cout << "server echoed our " << message_ << ": " << (copy == message_ ? "OK" : "FAIL") << std::endl;
  9. }
  10. stop();
  11. }
  12. void on_write(const error_code & err, size_t bytes) {
  13. do_read();
  14. }

當(dāng)連接成功之后,我們發(fā)送消息到服務(wù)端,do_write()。當(dāng)write操作結(jié)束時(shí),on_write()被調(diào)用,它初始化了一個(gè)do_read()方法,當(dāng)do_read()完成時(shí)。on_read()被調(diào)用;這里,我們簡(jiǎn)單的檢查一下返回的信息是否是服務(wù)端的回顯,然后退出服務(wù)。

我們會(huì)發(fā)送三個(gè)消息到服務(wù)端讓它變得更有趣一點(diǎn):

  1. int main(int argc, char* argv[]) {
  2. ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
  3. char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
  4. for ( char ** message = messages; *message; ++message) {
  5. talk_to_svr::start( ep, *message);
  6. boost::this_thread::sleep( boost::posix_time::millisec(100));
  7. }
  8. service.run();
  9. }

上述的代碼會(huì)生成如下的輸出:

  1. server echoed our John says hi: OK
  2. server echoed our so does James: OK
  3. server echoed our Lucy just got home: OK

TCP異步服務(wù)端

核心功能和同步服務(wù)端的功能類似,如下:

  1. class talk_to_client : public boost::enable_shared_from_this<talk_to_
  2. client>, boost::noncopyable {
  3. typedef talk_to_client self_type;
  4. talk_to_client() : sock_(service), started_(false) {}
  5. public:
  6. typedef boost::system::error_code error_code;
  7. typedef boost::shared_ptr<talk_to_client> ptr;
  8. void start() {
  9. started_ = true;
  10. do_read();
  11. }
  12. static ptr new_() {
  13. ptr new_(new talk_to_client);
  14. return new_;
  15. }
  16. void stop() {
  17. if ( !started_) return;
  18. started_ = false;
  19. sock_.close();
  20. }
  21. ip::tcp::socket & sock() { return sock_;}
  22. ...
  23. private:
  24. ip::tcp::socket sock_;
  25. enum { max_msg = 1024 };
  26. char read_buffer_[max_msg];
  27. char write_buffer_[max_msg];
  28. bool started_;
  29. };

因?yàn)槲覀兪欠浅:?jiǎn)單的回顯服務(wù),這里不需要is_started()方法。對(duì)每個(gè)客戶端,僅僅讀取它的消息,回顯,然后關(guān)閉它。

do_read(),do_write()read_complete()方法和TCP同步服務(wù)端的完全一致。

主要的邏輯同樣是在on_read()on_write()方法中:

  1. void on_read(const error_code & err, size_t bytes) {
  2. if ( !err) {
  3. std::string msg(read_buffer_, bytes);
  4. do_write(msg + "\n");
  5. }
  6. stop();
  7. }
  8. void on_write(const error_code & err, size_t bytes) {
  9. do_read();
  10. }

對(duì)客戶端的處理如下:

  1. ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
  2. void handle_accept(talk_to_client::ptr client, const error_code & err)
  3. {
  4. client->start();
  5. talk_to_client::ptr new_client = talk_to_client::new_();
  6. acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1));
  7. }
  8. int main(int argc, char* argv[]) {
  9. talk_to_client::ptr client = talk_to_client::new_();
  10. acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
  11. service.run();
  12. }

每一次客戶端連接到服務(wù)時(shí),handle_accept被調(diào)用,它會(huì)異步地從客戶端讀取,然后同樣異步地等待一個(gè)新的客戶端。

代碼

你會(huì)在這本書相應(yīng)的代碼中得到所有4個(gè)應(yīng)用(TCP回顯同步客戶端,TCP回顯同步服務(wù)端,TCP回顯異步客戶端,TCP回顯異步服務(wù)端)。當(dāng)測(cè)試時(shí),你可以使用任意客戶端/服務(wù)端組合(比如,一個(gè)異步客戶端和一個(gè)同步服務(wù)端)。

UDP回顯服務(wù)端/客戶端

因?yàn)閁DP不能保證所有信息都抵達(dá)接收者,我們不能保證“信息以回車結(jié)尾”。 沒收到消息,我們只是回顯,但是沒有socket去關(guān)閉(在服務(wù)端),因?yàn)槲覀兪荱DP。

UDP同步回顯客戶端

UDP回顯客戶端比TCP回顯客戶端要簡(jiǎn)單:

  1. ip::udp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
  2. void sync_echo(std::string msg) {
  3. ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 0));
  4. sock.send_to(buffer(msg), ep);
  5. char buff[1024];
  6. ip::udp::endpoint sender_ep;
  7. int bytes = sock.receive_from(buffer(buff), sender_ep);
  8. std::string copy(buff, bytes);
  9. std::cout << "server echoed our " << msg << ": " << (copy == msg ? "OK" : "FAIL") << std::endl;
  10. sock.close();
  11. }
  12. int main(int argc, char* argv[]) {
  13. char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
  14. boost::thread_group threads;
  15. for ( char ** message = messages; *message; ++message) {
  16. threads.create_thread( boost::bind(sync_echo, *message));
  17. boost::this_thread::sleep( boost::posix_time::millisec(100));
  18. }
  19. threads.join_all();
  20. }

所有的邏輯都在synch_echo()中;連接到服務(wù)端,發(fā)送消息,接收服務(wù)端的回顯,然后關(guān)閉連接。

UDP同步回顯服務(wù)端

UDP回顯服務(wù)端會(huì)是你寫過的最簡(jiǎn)單的服務(wù)端:

  1. io_service service;
  2. void handle_connections() {
  3. char buff[1024];
  4. ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 8001));
  5. while ( true) {
  6. ip::udp::endpoint sender_ep;
  7. int bytes = sock.receive_from(buffer(buff), sender_ep);
  8. std::string msg(buff, bytes);
  9. sock.send_to(buffer(msg), sender_ep);
  10. }
  11. }
  12. int main(int argc, char* argv[]) {
  13. handle_connections();
  14. }

它非常簡(jiǎn)單,而且能很好的自釋。

我把異步UDP客戶端和服務(wù)端留給讀者當(dāng)作一個(gè)練習(xí)。

總結(jié)

我們已經(jīng)寫了完整的應(yīng)用,最終讓Boost.Asio得以工作?;仫@應(yīng)用是開始學(xué)習(xí)一個(gè)庫時(shí)非常好的工具。你可以經(jīng)常學(xué)習(xí)和運(yùn)行這個(gè)章節(jié)所展示的代碼,這樣你就可以非常容易地記住這個(gè)庫的基礎(chǔ)。 在下一章,我們會(huì)建立更復(fù)雜的客戶端/服務(wù)端應(yīng)用,我們要確保避免低級(jí)錯(cuò)誤,比如內(nèi)存泄漏,死鎖等等。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)