Netty利用EventLoop實(shí)現(xiàn)調(diào)度任務(wù)執(zhí)行

2018-08-08 11:05 更新

我們每隔一段時(shí)間就需要調(diào)度任務(wù)執(zhí)行,或許你想要注冊一個(gè)任務(wù)在客戶端完成連接5分鐘后執(zhí)行,一個(gè)比較常見的用例是將一個(gè)信息發(fā)送給遠(yuǎn)端,看下遠(yuǎn)端是否有反應(yīng),如果沒有就可以關(guān)閉通道(連接)并且釋放資源。

本節(jié)的內(nèi)容是介紹如何使用強(qiáng)大的 EventLoop 實(shí)現(xiàn)任務(wù)調(diào)度,還會簡單介紹 Java API的任務(wù)調(diào)度,以方便和 Netty 比較加深理解。

使用普通的 Java API 調(diào)度任務(wù)

在 Java 中使用 JDK 提供的 ScheduledExecutorService 實(shí)現(xiàn)任務(wù)調(diào)度。使用 Executors 提供的靜態(tài)方法創(chuàng)建 ScheduledExecutorService,有如下方法

Table 15.1 java.util.concurrent.Executors-Static methods to create a ScheduledExecutorService

方法描述
newScheduledThreadPool(int corePoolSize) newScheduledThreadPool(int corePoolSize,ThreadFactorythreadFactory)創(chuàng)建一個(gè)新的

ScheduledThreadExecutorService 用于調(diào)度命令來延遲或者周期性的執(zhí)行。 corePoolSize 用于計(jì)算線程的數(shù)量 newSingleThreadScheduledExecutor() newSingleThreadScheduledExecutor(ThreadFact orythreadFactory) | 新建一個(gè) ScheduledThreadExecutorService 可以用于調(diào)度命令來延遲或者周期性的執(zhí)行。它將使用一個(gè)線程來執(zhí)行調(diào)度的任務(wù)

下面的 ScheduledExecutorService 調(diào)度任務(wù) 60 執(zhí)行一次

Listing 15.4 Schedule task with a ScheduledExecutorService

ScheduledExecutorService executor = Executors
        .newScheduledThreadPool(10); //1

ScheduledFuture<?> future = executor.schedule(
        new Runnable() { //2
            @Override
            public void run() {
                System.out.println("Now it is 60 seconds later");  //3
            }
        }, 60, TimeUnit.SECONDS);  //4
// do something
//

executor.shutdown();  //5
  1. 新建 ScheduledExecutorService 使用10個(gè)線程
  2. 新建 runnable 調(diào)度執(zhí)行
  3. 稍后運(yùn)行
  4. 調(diào)度任務(wù)60秒后執(zhí)行
  5. 關(guān)閉 ScheduledExecutorService 來釋放任務(wù)完成的資源

使用 EventLoop 調(diào)度任務(wù)

使用 ScheduledExecutorService 工作的很好,但是有局限性,比如在一個(gè)額外的線程中執(zhí)行任務(wù)。如果需要執(zhí)行很多任務(wù),資源使用就會很嚴(yán)重;對于像 Netty 這樣的高性能的網(wǎng)絡(luò)框架來說,嚴(yán)重的資源使用是不能接受的。Netty 對這個(gè)問題提供了很好的方法。

Netty 允許使用 EventLoop 調(diào)度任務(wù)分配到通道,如下面代碼:

Listing 15.5 Schedule task with EventLoop

Channel ch = null; // Get reference to channel
ScheduledFuture<?> future = ch.eventLoop().schedule(
        new Runnable() {
            @Override
            public void run() {
                System.out.println("Now its 60 seconds later");
            }
        }, 60, TimeUnit.SECONDS);
  1. 新建 runnable 用于執(zhí)行調(diào)度
  2. 稍后執(zhí)行
  3. 調(diào)度任務(wù)60秒后運(yùn)行

如果想任務(wù)每隔多少秒執(zhí)行一次,看下面代碼:

Listing 15.6 Schedule a fixed task with the EventLoop

Channel ch = null; // Get reference to channel
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
        new Runnable() {
            @Override
            public void run() {
                System.out.println("Run every 60 seconds");
            }
        }, 60, 60, TimeUnit.SECONDS);
  1. 新建 runnable 用于執(zhí)行調(diào)度
  2. 將運(yùn)行直到 ScheduledFuture 被取消
  3. 調(diào)度任務(wù)60秒運(yùn)行

取消操作,可以使用 ScheduledFuture 返回每個(gè)異步操作。 ScheduledFuture 提供一個(gè)方法用于取消一個(gè)調(diào)度了的任務(wù)或者檢查它的狀態(tài)。一個(gè)簡單的取消操作如下:

ScheduledFuture<?> future = ch.eventLoop()
.scheduleAtFixedRate(..); //1
// Some other code that runs...
future.cancel(false); //2
  1. 調(diào)度任務(wù)并獲取返回的 ScheduledFuture
  2. 取消任務(wù),阻止它再次運(yùn)行

調(diào)度的內(nèi)部實(shí)現(xiàn)

Netty 內(nèi)部實(shí)現(xiàn)其實(shí)是基于George Varghese 提出的 “Hashed and hierarchical timing wheels: Data structures to efficiently implement timer facility(散列和分層定時(shí)輪:數(shù)據(jù)結(jié)構(gòu)有效實(shí)現(xiàn)定時(shí)器)”。這種實(shí)現(xiàn)只保證一個(gè)近似執(zhí)行,也就是說任務(wù)的執(zhí)行可能不是100%準(zhǔn)確;在實(shí)踐中,這已經(jīng)被證明是一個(gè)可容忍的限制,不影響多數(shù)應(yīng)用程序。所以,定時(shí)執(zhí)行任務(wù)不可能100%準(zhǔn)確的按時(shí)執(zhí)行。

為了更好的理解它是如何工作,我們可以這樣認(rèn)為:

  • 在指定的延遲時(shí)間后調(diào)度任務(wù);
  • 任務(wù)被插入到 EventLoop 的 Schedule-Task-Queue(調(diào)度任務(wù)隊(duì)列);
  • 如果任務(wù)需要馬上執(zhí)行,EventLoop 檢查每個(gè)運(yùn)行;
  • 如果有一個(gè)任務(wù)要執(zhí)行,EventLoop 將立刻執(zhí)行它,并從隊(duì)列中刪除;
  • EventLoop 等待下一次運(yùn)行,從第4步開始一遍又一遍的重復(fù)。

因?yàn)檫@樣的實(shí)現(xiàn)計(jì)劃執(zhí)行不可能100%正確,對于多數(shù)用例不可能100%準(zhǔn)備的執(zhí)行計(jì)劃任務(wù);在 Netty 中,這樣的工作幾乎沒有資源開銷。

但是如果需要更準(zhǔn)確的執(zhí)行呢?很容易,你需要使用ScheduledExecutorService 的另一個(gè)實(shí)現(xiàn),這不是 Netty 的內(nèi)容。記住,如果不遵循 Netty 的線程模型協(xié)議,你將需要自己同步并發(fā)訪問。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號