Samza 事件循環(huán)

2018-08-22 17:53 更新

事件循環(huán)在任務(wù)之間編排讀取和處理消息,檢查點(diǎn),窗口刷新指標(biāo)。

默認(rèn)情況下,Samza 在每個(gè)容器中使用單個(gè)線程來(lái)運(yùn)行任務(wù)。這適合于 CPU 綁定的工作; 要獲得更多的 CPU 處理器,只需添加更多的容器。單線程執(zhí)行還簡(jiǎn)化了共享任務(wù)狀態(tài)和資源管理。

對(duì)于 IO 綁定作業(yè),Samza 支持同步和異步任務(wù)的更細(xì)微的并行性。對(duì)于同步任務(wù)(StreamTask 和 WindowableTask),可以通過(guò)配置內(nèi)置線程池 job.container.thread.pool.size 來(lái)調(diào)度它們并行運(yùn)行。這適合于阻塞 IO 任務(wù)方案。對(duì)于異步任務(wù)( AsyncStreamTask ),您可以進(jìn)行異步 IO 調(diào)用,并在完成后觸發(fā)回調(diào)。Samza 提供的最佳并行度在一個(gè)任務(wù)內(nèi),由task.max.concurrency 配置。

最新版本的 Samza 是線程安全的。您可以安全地訪問(wèn)任務(wù)線程中的鍵值存儲(chǔ),寫(xiě)入消息和檢查點(diǎn)偏移量的作業(yè)狀態(tài)。如果您在任務(wù)之間共享其他數(shù)據(jù),例如全局變量或靜態(tài)數(shù)據(jù),則如果可以通過(guò)多個(gè)線程并發(fā)訪問(wèn)數(shù)據(jù),例如 StreamTask 在配置的線程池中運(yùn)行多個(gè)線程,則不會(huì)線程安全。對(duì)于任務(wù)中的狀態(tài)(如成員變量),Samza 保證進(jìn)程,窗口和提交的相互排他性,因此這些操作之間不會(huì)有并發(fā)的修改,任何來(lái)自一個(gè)操作的狀態(tài)變化都將完全可見(jiàn)。

事件循環(huán)內(nèi)部

容器可能有多個(gè)SystemConsumers用于消耗來(lái)自不同輸入系統(tǒng)的消息。每個(gè) SystemConsumer 在其自己的線程上讀取消息,但將消息寫(xiě)入共享進(jìn)程內(nèi)消息隊(duì)列。容器使用此隊(duì)列將所有消息匯總到事件循環(huán)中。

事件循環(huán)如下:

  1. 從傳入消息隊(duì)列中選擇一條消息;
  2. 安排適當(dāng)?shù)?a href="http://o2fo.com/samza/samza-9eup28lu.html" target="_blank">任務(wù)實(shí)例來(lái)處理消息;
  3. 如果實(shí)現(xiàn) WindowableTask,則任務(wù)實(shí)例上的 Schedule window()運(yùn)行,窗口計(jì)時(shí)器已被觸發(fā);
  4. 將 process()和 window()調(diào)用的任何輸出發(fā)送到相應(yīng)的 SystemProducer ;
  5. 為 提交間隔 已過(guò)的任務(wù)寫(xiě)入檢查點(diǎn)并刷新?tīng)顟B(tài)存儲(chǔ)。
  6. 如果所有任務(wù)實(shí)例忙于處理未完成的消息,窗口或檢查點(diǎn),則阻止。

容器在循環(huán)中進(jìn)行,直到它被關(guān)閉。

同步任務(wù)與異步任務(wù)的語(yǔ)義

事件循環(huán)的語(yǔ)義在運(yùn)行同步任務(wù)和異步任務(wù)時(shí)有所不同:

  • 對(duì)于同步任務(wù)(StreamTask 和 WindowableTask),默認(rèn)情況下,process()和 window()將在單個(gè)主線程上運(yùn)行。您可以將 job.container.thread.pool.size 配置為大于 1,并且事件循環(huán)將process()和 window()在線程池中運(yùn)行。
  • 對(duì)于異步任務(wù)(AsyncStreamTask),processAsync()將始終在單個(gè)線程中調(diào)用,而回調(diào)可以從不同的用戶線程觸發(fā)。

在這兩種情況下,任務(wù)中的默認(rèn)并發(fā)性為1,這意味著每個(gè)任務(wù)處理中最多只有一個(gè)未完成的消息。這保證主題分區(qū)中的按順序消息處理。您可以通過(guò)將 task.max.concurrency 配置為大于1來(lái)進(jìn)一步增加它。這允許多個(gè)未完成的消息由任務(wù)并行處理。此選項(xiàng)會(huì)增加任務(wù)中的并行性,但可能導(dǎo)致無(wú)序處理和完成。

在上述任何一種情況下,以下語(yǔ)義都得到保證(對(duì)于事件發(fā)生的語(yǔ)義,請(qǐng)參閱此處):

  • 如果 task.max.concurrency = 1,則任務(wù)中的每個(gè)消息進(jìn)程完成都將保證發(fā)生,在下一次調(diào)用同一任務(wù)的 process()/ processAsync()之前。如果 task.max.concurrency> 1,則不存在這種情況 - 在約束之前,用戶應(yīng)該同步對(duì)任務(wù)中的任何共享/全局變量的訪問(wèn)。
  • 當(dāng)沒(méi)有對(duì) process()/ processAsync()的調(diào)用進(jìn)行掛起時(shí),將調(diào)用 WindowableTask.window(),并且在完成之前不會(huì)調(diào)度新的 process()/ processAsync()調(diào)用。因此,保證所有以前的 process()/ processAsync()調(diào)用在調(diào)用 WindowableTask.window()之前發(fā)生。在任何后續(xù) process()/ processAsync()調(diào)用之前,將保證對(duì)WindowableTask.window()的調(diào)用發(fā)生。Samza 引擎負(fù)責(zé)確保及時(shí)調(diào)用該窗口。
  • 檢查點(diǎn)保證僅覆蓋完全處理的事件。只有當(dāng)沒(méi)有待處理的進(jìn)程()/ processAsync()或WindowableTask.window()調(diào)用時(shí)才會(huì)發(fā)生。所有之前的調(diào)用發(fā)生在檢查點(diǎn)和檢查點(diǎn)發(fā)生之前 - 在所有后續(xù)調(diào)用之前。

更多詳細(xì)信息和示例可以在Samza Async API和多線程用戶指南中找到。

生命周期

開(kāi)發(fā)人員可以掛接到 SamzaContainer 的生命周期中的唯一方法是通過(guò)標(biāo)準(zhǔn)的 InitableTask,ClosableTask,StreamTask / AsyncStreamTask 和 WindowableTask。在需要添加可插入邏輯以包裝 StreamTask 的情況下,StreamTask 可以被另一個(gè)處理自定義邏輯的 StreamTask 實(shí)現(xiàn)包裝,然后再調(diào)用到包裝的 StreamTask 中。

一個(gè)具體的例子是一組 StreamTask,它們都希望在其 process()方法中共享相同的 try / catch 邏輯。可以實(shí)現(xiàn) StreamTask,它包裝原始 StreamTasks,并使用適當(dāng)?shù)?try / catch 邏輯圍繞原始 process()調(diào)用。

指標(biāo)  ?

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)