Samza窗口功能

2018-08-22 17:45 更新

有時,流處理作業(yè)需要定期執(zhí)行某些操作,無論作業(yè)正在處理多少個傳入消息。例如,假設(shè)您要報(bào)告每分鐘的頁面瀏覽次數(shù)。為此,您每次看到頁面查看事件時都會增加一個計(jì)數(shù)器。每分鐘一次,將當(dāng)前計(jì)數(shù)器值發(fā)送到輸出流,并將計(jì)數(shù)器復(fù)位為零。

Samza 的窗口功能為任務(wù)以常規(guī)時間間隔提供了一種方式,例如每分鐘一次。要啟用窗口,您只需要在作業(yè)配置中設(shè)置一個屬性:

# Call the window() method every 60 seconds
task.window.ms=60000

接下來,您的流任務(wù)需要實(shí)現(xiàn)WindowableTask接口。此接口定義了一個由 Samza 在您配置的常規(guī)間隔中調(diào)用的 window()方法。

例如,您將如何實(shí)現(xiàn)基本的每分鐘事件計(jì)數(shù)器:

public class EventCounterTask implements StreamTask, WindowableTask {

  public static final SystemStream OUTPUT_STREAM =
    new SystemStream("kafka", "events-per-minute");

  private int eventsSeen = 0;

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    eventsSeen++;
  }

  public void window(MessageCollector collector,
                     TaskCoordinator coordinator) {
    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
    eventsSeen = 0;
  }
}

如果需要向輸出流發(fā)送消息,可以使用傳遞給 window()方法的MessageCollector對象。請僅使用該 MessageCollector 對象發(fā)送消息,并且不要在調(diào)用 window()之外使用它。

請注意,Samza 使用單線程執(zhí)行,因此 window()調(diào)用永遠(yuǎn)不會與 process()調(diào)用同時發(fā)生。這樣做的優(yōu)點(diǎn)在于您不需要擔(dān)心代碼中的線程安全性(不需要同步任何內(nèi)容),但如果您的process()方法需要很長時間才能返回,則 window()調(diào)用可能會被延遲的缺點(diǎn)。

協(xié)調(diào)器流  ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號