Samza 事件循環(huán)

2018-08-22 17:53 更新

事件循環(huán)在任務之間編排讀取和處理消息,檢查點,窗口刷新指標。

默認情況下,Samza 在每個容器中使用單個線程來運行任務。這適合于 CPU 綁定的工作; 要獲得更多的 CPU 處理器,只需添加更多的容器。單線程執(zhí)行還簡化了共享任務狀態(tài)和資源管理。

對于 IO 綁定作業(yè),Samza 支持同步和異步任務的更細微的并行性。對于同步任務(StreamTask 和 WindowableTask),可以通過配置內(nèi)置線程池 job.container.thread.pool.size 來調(diào)度它們并行運行。這適合于阻塞 IO 任務方案。對于異步任務( AsyncStreamTask ),您可以進行異步 IO 調(diào)用,并在完成后觸發(fā)回調(diào)。Samza 提供的最佳并行度在一個任務內(nèi),由task.max.concurrency 配置。

最新版本的 Samza 是線程安全的。您可以安全地訪問任務線程中的鍵值存儲,寫入消息和檢查點偏移量的作業(yè)狀態(tài)。如果您在任務之間共享其他數(shù)據(jù),例如全局變量或靜態(tài)數(shù)據(jù),則如果可以通過多個線程并發(fā)訪問數(shù)據(jù),例如 StreamTask 在配置的線程池中運行多個線程,則不會線程安全。對于任務中的狀態(tài)(如成員變量),Samza 保證進程,窗口和提交的相互排他性,因此這些操作之間不會有并發(fā)的修改,任何來自一個操作的狀態(tài)變化都將完全可見。

事件循環(huán)內(nèi)部

容器可能有多個SystemConsumers用于消耗來自不同輸入系統(tǒng)的消息。每個 SystemConsumer 在其自己的線程上讀取消息,但將消息寫入共享進程內(nèi)消息隊列。容器使用此隊列將所有消息匯總到事件循環(huán)中。

事件循環(huán)如下:

  1. 從傳入消息隊列中選擇一條消息;
  2. 安排適當?shù)?a href="http://www.o2fo.com/samza/samza-9eup28lu.html" target="_blank">任務實例來處理消息;
  3. 如果實現(xiàn) WindowableTask,則任務實例上的 Schedule window()運行,窗口計時器已被觸發(fā);
  4. 將 process()和 window()調(diào)用的任何輸出發(fā)送到相應的 SystemProducer ;
  5. 為 提交間隔 已過的任務寫入檢查點并刷新狀態(tài)存儲。
  6. 如果所有任務實例忙于處理未完成的消息,窗口或檢查點,則阻止。

容器在循環(huán)中進行,直到它被關閉。

同步任務與異步任務的語義

事件循環(huán)的語義在運行同步任務和異步任務時有所不同:

  • 對于同步任務(StreamTask 和 WindowableTask),默認情況下,process()和 window()將在單個主線程上運行。您可以將 job.container.thread.pool.size 配置為大于 1,并且事件循環(huán)將process()和 window()在線程池中運行。
  • 對于異步任務(AsyncStreamTask),processAsync()將始終在單個線程中調(diào)用,而回調(diào)可以從不同的用戶線程觸發(fā)。

在這兩種情況下,任務中的默認并發(fā)性為1,這意味著每個任務處理中最多只有一個未完成的消息。這保證主題分區(qū)中的按順序消息處理。您可以通過將 task.max.concurrency 配置為大于1來進一步增加它。這允許多個未完成的消息由任務并行處理。此選項會增加任務中的并行性,但可能導致無序處理和完成。

在上述任何一種情況下,以下語義都得到保證(對于事件發(fā)生的語義,請參閱此處):

  • 如果 task.max.concurrency = 1,則任務中的每個消息進程完成都將保證發(fā)生,在下一次調(diào)用同一任務的 process()/ processAsync()之前。如果 task.max.concurrency> 1,則不存在這種情況 - 在約束之前,用戶應該同步對任務中的任何共享/全局變量的訪問。
  • 當沒有對 process()/ processAsync()的調(diào)用進行掛起時,將調(diào)用 WindowableTask.window(),并且在完成之前不會調(diào)度新的 process()/ processAsync()調(diào)用。因此,保證所有以前的 process()/ processAsync()調(diào)用在調(diào)用 WindowableTask.window()之前發(fā)生。在任何后續(xù) process()/ processAsync()調(diào)用之前,將保證對WindowableTask.window()的調(diào)用發(fā)生。Samza 引擎負責確保及時調(diào)用該窗口。
  • 檢查點保證僅覆蓋完全處理的事件。只有當沒有待處理的進程()/ processAsync()或WindowableTask.window()調(diào)用時才會發(fā)生。所有之前的調(diào)用發(fā)生在檢查點和檢查點發(fā)生之前 - 在所有后續(xù)調(diào)用之前。

更多詳細信息和示例可以在Samza Async API和多線程用戶指南中找到

生命周期

開發(fā)人員可以掛接到 SamzaContainer 的生命周期中的唯一方法是通過標準的 InitableTask,ClosableTask,StreamTask / AsyncStreamTask 和 WindowableTask。在需要添加可插入邏輯以包裝 StreamTask 的情況下,StreamTask 可以被另一個處理自定義邏輯的 StreamTask 實現(xiàn)包裝,然后再調(diào)用到包裝的 StreamTask 中。

一個具體的例子是一組 StreamTask,它們都希望在其 process()方法中共享相同的 try / catch 邏輯。可以實現(xiàn) StreamTask,它包裝原始 StreamTasks,并使用適當?shù)?try / catch 邏輯圍繞原始 process()調(diào)用。

指標  ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號