W3Cschool
恭喜您成為首批注冊(cè)用戶(hù)
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
有時(shí),流處理作業(yè)需要定期執(zhí)行某些操作,無(wú)論作業(yè)正在處理多少個(gè)傳入消息。例如,假設(shè)您要報(bào)告每分鐘的頁(yè)面瀏覽次數(shù)。為此,您每次看到頁(yè)面查看事件時(shí)都會(huì)增加一個(gè)計(jì)數(shù)器。每分鐘一次,將當(dāng)前計(jì)數(shù)器值發(fā)送到輸出流,并將計(jì)數(shù)器復(fù)位為零。
Samza 的窗口功能為任務(wù)以常規(guī)時(shí)間間隔提供了一種方式,例如每分鐘一次。要啟用窗口,您只需要在作業(yè)配置中設(shè)置一個(gè)屬性:
# Call the window() method every 60 seconds
task.window.ms=60000
接下來(lái),您的流任務(wù)需要實(shí)現(xiàn)WindowableTask接口。此接口定義了一個(gè)由 Samza 在您配置的常規(guī)間隔中調(diào)用的 window()方法。
例如,您將如何實(shí)現(xiàn)基本的每分鐘事件計(jì)數(shù)器:
public class EventCounterTask implements StreamTask, WindowableTask {
public static final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "events-per-minute");
private int eventsSeen = 0;
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
eventsSeen++;
}
public void window(MessageCollector collector,
TaskCoordinator coordinator) {
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
eventsSeen = 0;
}
}
如果需要向輸出流發(fā)送消息,可以使用傳遞給 window()方法的MessageCollector對(duì)象。請(qǐng)僅使用該 MessageCollector 對(duì)象發(fā)送消息,并且不要在調(diào)用 window()之外使用它。
請(qǐng)注意,Samza 使用單線(xiàn)程執(zhí)行,因此 window()調(diào)用永遠(yuǎn)不會(huì)與 process()調(diào)用同時(shí)發(fā)生。這樣做的優(yōu)點(diǎn)在于您不需要擔(dān)心代碼中的線(xiàn)程安全性(不需要同步任何內(nèi)容),但如果您的process()方法需要很長(zhǎng)時(shí)間才能返回,則 window()調(diào)用可能會(huì)被延遲的缺點(diǎn)。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話(huà):173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: