Netty如何寫廣播器

2018-08-08 11:01 更新

本節(jié)的內(nèi)容是要寫一個(gè)廣播器。下圖展示了廣播一個(gè) DatagramPacket 在每個(gè)日志實(shí)體里面的方法:

Figure%2013

  1. 日志文件
  2. 日志文件中的日志實(shí)體
  3. 一個(gè) DatagramPacket 保持一個(gè)單獨(dú)的日志實(shí)體

Figure 13.2 Log entries sent with DatagramPackets

圖13.3表示一個(gè) LogEventBroadcaster 的 ChannelPipeline 的高級(jí)視圖,說明了 LogEvent 是如何流轉(zhuǎn)的。

Figure%2013

Figure 13.3 LogEventBroadcaster: ChannelPipeline and LogEvent flow

正如我們所看到的,所有的數(shù)據(jù)傳輸都封裝在 LogEvent 消息里。LogEventBroadcaster 寫這些通過在本地端的管道,發(fā)送它們通過ChannelPipeline 轉(zhuǎn)換(編碼)為一個(gè)定制的 ChannelHandler 的DatagramPacket 信息。最后,他們通過 UDP 廣播并被遠(yuǎn)程接收。

編碼器和解碼器

編碼器和解碼器將消息從一種格式轉(zhuǎn)換為另一種,深度探討在第7章中進(jìn)行。我們探索 Netty 提供的基礎(chǔ)類來簡(jiǎn)化和實(shí)現(xiàn)自定義 ChannelHandler 如 LogEventEncoder 在這個(gè)應(yīng)用程序中。

下面展示了 編碼器的實(shí)現(xiàn)

Listing 13.2 LogEventEncoder

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    public LogEventEncoder(InetSocketAddress remoteAddress) {  //1
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8); //2
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR); //3
        buf.writeBytes(msg);  //4
        out.add(new DatagramPacket(buf, remoteAddress));  //5
    }
}
  1. LogEventEncoder 創(chuàng)建了 DatagramPacket 消息類發(fā)送到指定的 InetSocketAddress
  2. 寫文件名到 ByteBuf
  3. 添加一個(gè) SEPARATOR
  4. 寫一個(gè)日志消息到 ByteBuf
  5. 添加新的 DatagramPacket 到出站消息

為什么使用 MessageToMessageEncoder?

當(dāng)然我們可以編寫自己的自定義 ChannelOutboundHandler 來轉(zhuǎn)換 LogEvent 對(duì)象到 DatagramPackets。但是繼承自MessageToMessageEncoder 為我們簡(jiǎn)化和做了大部分的工作。

為了實(shí)現(xiàn) LogEventEncoder,我們只需要定義服務(wù)器的運(yùn)行時(shí)配置,我們稱之為“bootstrapping(引導(dǎo))”。這包括設(shè)置各種 ChannelOption 并安裝需要的 ChannelHandler 到 ChannelPipeline 中。完成的 LogEventBroadcaster 類,如清單13.3所示。

Listing 13.3 LogEventBroadcaster

public class LogEventBroadcaster {
    private final Bootstrap bootstrap;
    private final File file;
    private final EventLoopGroup group;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address)); //1

        this.file = file;
    }

    public void run() throws IOException {
        Channel ch = bootstrap.bind(0).syncUninterruptibly().channel(); //2
        System.out.println("LogEventBroadcaster running");
        long pointer = 0;
        for (;;) {
            long len = file.length(); 
            if (len < pointer) {
                // file was reset
                pointer = len; //3
            } else if (len > pointer) {
                // Content was added
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                raf.seek(pointer);  //4
                String line;
                while ((line = raf.readLine()) != null) {
                    ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line));  //5
                }
                pointer = raf.getFilePointer(); //6
                raf.close();
            }
            try {
                Thread.sleep(1000);  //7
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }

        LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
                Integer.parseInt(args[0])), new File(args[1]));  //8
        try {
            broadcaster.run();
        } finally {
            broadcaster.stop();
        }
    }
}
  1. 引導(dǎo) NioDatagramChannel 。為了使用廣播,我們?cè)O(shè)置 SO_BROADCAST 的 socket 選項(xiàng)
  2. 綁定管道。注意當(dāng)使用 Datagram Channel 時(shí),是沒有連接的
  3. 如果需要,可以設(shè)置文件的指針指向文件的最后字節(jié)
  4. 設(shè)置當(dāng)前文件的指針,這樣不會(huì)把舊的發(fā)出去
  5. 寫一個(gè) LogEvent 到管道用于保存文件名和文件實(shí)體。(我們期望每個(gè)日志實(shí)體是一行長(zhǎng)度)
  6. 存儲(chǔ)當(dāng)前文件的位置,這樣,我們可以稍后繼續(xù)
  7. 睡 1 秒。如果其他中斷退出循環(huán)就重新啟動(dòng)它。
  8. 構(gòu)造一個(gè)新的實(shí)例 LogEventBroadcaster 并啟動(dòng)它

這就是程序的完整的第一部分??梢允褂?"netcat" 程序查看程序的結(jié)果。在 UNIX/Linux 系統(tǒng),可以使用 "nc", 在 Windows 環(huán)境下,可以在 http://nmap.org/ncat找到

Netcat 是完美的第一個(gè)測(cè)試我們的應(yīng)用程序;它只是監(jiān)聽指定的端口上接收并打印所有數(shù)據(jù)到標(biāo)準(zhǔn)輸出。將其設(shè)置為在端口 9999 上監(jiān)聽 UDP 數(shù)據(jù)如下:

$ nc -l -u 9999

現(xiàn)在我們需要啟動(dòng) LogEventBroadcaster。清單13.4顯示了如何使用 mvn 編譯和運(yùn)行廣播器。pom的配置。pom.xml 配置指向一個(gè)文件/var/log/syslog(假設(shè)是UNIX / Linux環(huán)境)和端口設(shè)置為 9999。文件中的條目將通過 UDP 廣播到端口,在你開始 netcat 后打印到控制臺(tái)。

Listing 13.4 Compile and start the LogEventBroadcaster

$ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------------------------------------------
[INFO] Building netty-in-action 0.1-SNAPSHOT
[INFO] --------------------------------------------------------------------
...
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: /Users/norman/Documents/workspace-intellij/netty-in-actionprivate/
target/netty-in-action-0.1-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action -
LogEventBroadcaster running

當(dāng)調(diào)用 mvn 時(shí),在系統(tǒng)屬性中改變文件和端口值,指定你想要的。清單13.5 設(shè)置日志文件 到 /var/log/mail.log 和端口 8888。

Listing 13.5 Compile and start the LogEventBroadcaster

$ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster /
-Dlogfile=/var/log/mail.log -Dport=8888 -....
....
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action -
LogEventBroadcaster running

當(dāng)看到 “LogEventBroadcaster running” 說明程序運(yùn)行成功了。

netcat 只用于測(cè)試,但不適合生產(chǎn)環(huán)境中使用。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)