W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗(yàn)值獎勵
有時,流處理作業(yè)需要定期執(zhí)行某些操作,無論作業(yè)正在處理多少個傳入消息。例如,假設(shè)您要報(bào)告每分鐘的頁面瀏覽次數(shù)。為此,您每次看到頁面查看事件時都會增加一個計(jì)數(shù)器。每分鐘一次,將當(dāng)前計(jì)數(shù)器值發(fā)送到輸出流,并將計(jì)數(shù)器復(fù)位為零。
Samza 的窗口功能為任務(wù)以常規(guī)時間間隔提供了一種方式,例如每分鐘一次。要啟用窗口,您只需要在作業(yè)配置中設(shè)置一個屬性:
# Call the window() method every 60 seconds
task.window.ms=60000
接下來,您的流任務(wù)需要實(shí)現(xiàn)WindowableTask接口。此接口定義了一個由 Samza 在您配置的常規(guī)間隔中調(diào)用的 window()方法。
例如,您將如何實(shí)現(xiàn)基本的每分鐘事件計(jì)數(shù)器:
public class EventCounterTask implements StreamTask, WindowableTask {
public static final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "events-per-minute");
private int eventsSeen = 0;
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
eventsSeen++;
}
public void window(MessageCollector collector,
TaskCoordinator coordinator) {
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
eventsSeen = 0;
}
}
如果需要向輸出流發(fā)送消息,可以使用傳遞給 window()方法的MessageCollector對象。請僅使用該 MessageCollector 對象發(fā)送消息,并且不要在調(diào)用 window()之外使用它。
請注意,Samza 使用單線程執(zhí)行,因此 window()調(diào)用永遠(yuǎn)不會與 process()調(diào)用同時發(fā)生。這樣做的優(yōu)點(diǎn)在于您不需要擔(dān)心代碼中的線程安全性(不需要同步任何內(nèi)容),但如果您的process()方法需要很長時間才能返回,則 window()調(diào)用可能會被延遲的缺點(diǎn)。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: