Netty案例研究:Transport 的遷移

2021-12-02 09:42 更新

本節(jié)將會通過一個簡單的應(yīng)用程序來讓您看看在 Netty  Transport(傳輸) 是如何工作的,這個應(yīng)用程序要做的事情非常簡單,只需要連接好客戶端,并且向客戶端發(fā)送字符串“Hi!”的信息,信息發(fā)送完之后連接就斷開了。

不用 Netty 實現(xiàn) I/O 和 NIO

我們將不用 Netty 實現(xiàn) I/O 和 NIO,而是使用 JDK API 來實現(xiàn) I/O 和 NIO。下面這個例子,是使用阻塞 IO 實現(xiàn)的例子:

Listing 4.1 Blocking networking without Netty

public class PlainOioServer {

    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);     //1
        try {
            for (;;) {
                final Socket clientSocket = socket.accept();    //2
                System.out.println("Accepted connection from " + clientSocket);

                new Thread(new Runnable() {                        //3
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            //4
                            out.flush();
                            clientSocket.close();                //5

                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();                                        //6
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

1.綁定服務(wù)器到指定的端口。

2.接受一個連接。

3.創(chuàng)建一個新的線程來處理連接。

4.將消息發(fā)送到連接的客戶端。

5.一旦消息被寫入和刷新時就 關(guān)閉連接。

6.啟動線程。

上面的方式可以工作正常,但是這種阻塞模式在大連接數(shù)的情況就會有很嚴重的問題,如客戶端連接超時,服務(wù)器響應(yīng)嚴重延遲,性能無法擴展。為了解決這種情況,我們可以使用異步網(wǎng)絡(luò)處理所有的并發(fā)連接,但問題在于 NIO 和 OIO 的 API 是完全不同的,所以一個用 OIO 開發(fā)的網(wǎng)絡(luò)應(yīng)用程序想要使用 NIO 重構(gòu)代碼幾乎是重新開發(fā)。

下面代碼是使用 NIO 實現(xiàn)的例子:

Listing 4.2 Asynchronous networking without Netty

public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);                                            //1
        Selector selector = Selector.open();                        //2
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //3
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;) {
            try {
                selector.select();                                    //4
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();    //5
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {                //6
                        ServerSocketChannel server =
                                (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());    //7
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {                //8
                        SocketChannel client =
                                (SocketChannel)key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer)key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {        //9
                                break;
                            }
                        }
                        client.close();                    //10
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // 在關(guān)閉時忽略
                    }
                }
            }
        }
    }
}

1.綁定服務(wù)器到制定端口

2.打開 selector 處理 channel

3.注冊 ServerSocket 到 ServerSocket ,并指定這是專門用來接受連接的。

4.等待新的事件來處理。這將阻塞,直到一個事件是傳入。

5.從收到的所有事件中 獲取 SelectionKey 實例。

6.檢查該事件是一個新的連接準備好接受。

7.接受客戶端,并用 selector 進行注冊。

8.檢查 socket 是否準備好寫數(shù)據(jù)。

9.將數(shù)據(jù)寫入到所連接的客戶端。如果網(wǎng)絡(luò)飽和,連接是可寫的,那么這個循環(huán)將寫入數(shù)據(jù),直到該緩沖區(qū)是空的。

10.關(guān)閉連接。

如你所見,即使它們實現(xiàn)的功能是一樣,但是代碼完全不同。下面我們將用 Netty 來實現(xiàn)相同的功能。

采用 Netty 實現(xiàn) I/O 和 NIO

下面代碼是使用 Netty 作為網(wǎng)絡(luò)框架編寫的一個阻塞 IO 例子:

Listing 4.3 Blocking networking with Netty

public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();        //1

            b.group(group)                                    //2
             .channel(OioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {//3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();  //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();        //7
        }
    }
}

1.創(chuàng)建一個 ServerBootstrap

2.使用 OioEventLoopGroup 允許阻塞模式(OIO)

3.指定 ChannelInitializer 將給每個接受的連接調(diào)用

4.添加的 ChannelHandler 攔截事件,并允許他們作出反應(yīng)

5.寫信息到客戶端,并添加 ChannelFutureListener 當一旦消息寫入就關(guān)閉連接

6.綁定服務(wù)器來接受連接

7.釋放所有資源

下面代碼是使用 Netty NIO 實現(xiàn)。

Netty NIO 版本

下面是 Netty NIO 的代碼,只是改變了一行代碼,就從 OIO 傳輸 切換到了 NIO。

Listing 4.4 Asynchronous networking with Netty

public class NettyNioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();    //1
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())   //2
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {    //3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {    //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate())                //5
                                .addListener(ChannelFutureListener.CLOSE);
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();                    //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();                    //7
        }
    }
}

1.創(chuàng)建一個 ServerBootstrap

2.使用 NioEventLoopGroup 允許非阻塞模式

3.指定 ChannelInitializer 將給每個接受的連接調(diào)用

4.添加的 ChannelInboundHandlerAdapter() 接收事件并進行處理

5.寫信息到客戶端,并添加 ChannelFutureListener 當一旦消息寫入就關(guān)閉連接

6.綁定服務(wù)器來接受連接

7.釋放所有資源

我們之前提到過 Netty 使用的是統(tǒng)一的 API,所以 Netty 中實現(xiàn)的每個傳輸都是用了同樣的 API,你使用什么來實現(xiàn)并不在它的關(guān)心范圍內(nèi)。Netty 通過操作接口 Channel 、ChannelPipeline 和 ChannelHandler 來實現(xiàn)。

使用基于 Netty 傳輸的好處顯而易見了吧,那么接下來我們就來看看傳輸?shù)?API。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號