介紹SamzaContainer

2018-08-22 17:22 更新

SamzaContainer 負責管理一個或多個 StreamTask 實例的啟動,執(zhí)行和關(guān)閉。每個 SamzaContainer 通常作為一個獨立的 Java 虛擬機運行。Samza 工作可以由幾個可能運行在不同機器上的 SamzaContainers 組成。

當 SamzaContainer 啟動時,它執(zhí)行以下操作:

  1. 獲取消耗的每個輸入流分區(qū)的上次檢查點偏移量
  2. 為其消耗的每個輸入流分區(qū)創(chuàng)建一個“閱??讀器”線程
  3. 開始指標報告員報告指標
  4. 啟動一個檢查點計時器,以便每隔一段時間保存任務(wù)的輸入流偏移量
  5. 啟動窗口計時器以觸發(fā)您的任務(wù)窗口方法(如果已定義)
  6. 為每個輸入流分區(qū)實例化并初始化一次StreamTask
  7. 啟動一個事件循環(huán),從輸入流讀取器線程接收消息,并將它們提供給您的StreamTasks
  8. 在每個步驟中通知生命周期偵聽器

我們從中間開始,通過 StreamTask 的實例化。本文檔的以下部分涵蓋了其他步驟。

任務(wù)和分區(qū)

當容器啟動時,它會創(chuàng)建您編寫的任務(wù)類的實例。如果任務(wù)類實現(xiàn)了InitableTask接口,SamzaContainer 也將調(diào)用 init()方法。

/** Implement this if you want a callback when your task starts up. */
public interface InitableTask {
  void init(Config config, TaskContext context);
}

默認情況下,創(chuàng)建任務(wù)類的實例數(shù)取決于作業(yè)輸入流中的分區(qū)數(shù)。如果您的 Samza 作業(yè)有十個分區(qū),您的任務(wù)類將有十個實例:每個分區(qū)一個。第一個任務(wù)實例將接收所有分區(qū)的消息,第二個實例將接收分區(qū)二的所有消息,依此類推。

1502850330231923

輸入流中的分區(qū)數(shù)由您所消費的系統(tǒng)決定。例如,如果您的輸入系統(tǒng)是 Kafka,則可以在命令行中創(chuàng)建主題或在 Kafka 的服務(wù)器屬性文件中使用 num.partition 指定分區(qū)數(shù)。

如果 Samza 作業(yè)有多個輸入流,則 Samza 作業(yè)的任務(wù)實例數(shù)是所有輸入流中最大分區(qū)數(shù)。例如,如果 Samza 作業(yè)正在從 PageViewEvent(12個分區(qū))和 ServiceMetricEvent(14個分區(qū))讀取,則 Samza 作業(yè)將具有14個任務(wù)實例(編號為0到13)。任務(wù)實例12和13只接收來自 ServiceMetricEvent 的事件,因為沒有相應(yīng)的 PageViewEvent 分區(qū)。

使用這種將輸入流分配給任務(wù)實例的默認方法,Samza 正在以其分區(qū)作為鍵對輸入流上的分組操作進行有效的執(zhí)行。通過實施新的SystemStreamPartitionGrouper和工廠以及配置作業(yè)以通過 job.systemstreampartition.grouper.factory 配置值使用它來實現(xiàn)對輸入流分區(qū)進行分組的其他策略。

Samza 提供了上述討論的每個分區(qū)分片器以及 GroupBySystemStreamPartitionGrouper,它為每個輸入流分區(qū)提供一個單獨的任務(wù)類實例,有效地通過輸入流本身進行分組。這提供了可以使用多少個容器來處理這些輸入流的最大可擴展性,并且適用于不需要輸入流分組的非常高容量的作業(yè)。

考慮到上述 PageViewEvent 分區(qū)12路和 ServiceMetricEvent 分區(qū)方式的示例,GroupBySystemStreamPartitionGrouper 將創(chuàng)建12 + 14 = 26個任務(wù)實例,然后將分布在配置的容器數(shù)量上,如下所述。

請注意,一旦使用特定的 SystemStreamPartitionGrouper 啟動作業(yè),該作業(yè)正在使用狀態(tài)或檢查點,則不可能在后續(xù)作業(yè)啟動時更改該分組,因為在新的分組方法中以前的檢查點和狀態(tài)信息可能不正確。

容器和資源分配

雖然任務(wù)實例的數(shù)量是固定的 - 由輸入分區(qū)的數(shù)量確定 - 您可以配置要用于作業(yè)的容器數(shù)量。如果使用YARN,容器數(shù)決定了哪些CPU和內(nèi)存資源分配給您的作業(yè)。

如果輸入流上的數(shù)據(jù)量很小,那么只能使用一個 SamzaContainer 就足夠了。在這種情況下,Samza 仍會為每個輸入分區(qū)創(chuàng)建一個任務(wù)實例,但所有這些任務(wù)都在同一容器中運行。另一方面,您可以創(chuàng)建與分區(qū)一樣多的容器,Samza 將為每個容器分配一個任務(wù)實例。

每個 SamzaContainer 設(shè)計為使用一個CPU內(nèi)核,因此它使用單線程事件循環(huán)執(zhí)行。在 SamzaContainer 中創(chuàng)建自己的線程是不可取的。如果需要更多的并行性,請將您的工作配置為使用更多的容器。

您的作業(yè)中的任何狀態(tài)都屬于任務(wù)實例,而不是容器。這是 Samza 可擴展性的關(guān)鍵設(shè)計決策:隨著您的工作資源需求的增長和縮小,您可以簡單地增加或減少容器數(shù)量,但是任務(wù)實例的數(shù)量保持不變。當您向上或向下擴展時,每個任務(wù)實例仍然保持相同的狀態(tài)。任務(wù)實例可能從一個容器移動到另一個容器,并且由 Samza 管理的任何持久狀態(tài)將隨之移動。這樣就可以使作業(yè)的處理語義保持不變,即使您更改了作業(yè)的并行性。

加入多個輸入流

如果您的工作有多個輸入流,Samza 提供了一個簡單而強大的機制來加入來自不同流的數(shù)據(jù):每個任務(wù)實例都從每個輸入流的一個分區(qū)接收消息。例如,假設(shè)您有兩個輸入流A和B,每個具有四個分區(qū)。Samza 創(chuàng)建四個任務(wù)實例來處理它們,并按如下所示分配分區(qū):

任務(wù)實例消耗流分區(qū)
0流A分區(qū)0,流B分區(qū)0
1流A分區(qū)1,流B分區(qū)1
2流A分區(qū)2,流B分區(qū)2
3流A分區(qū)3,流B分區(qū)3

因此,如果您希望不同流中的兩個事件由同一個任務(wù)實例處理,則需要確保將其發(fā)送到相同的分區(qū)號。您可以通過在發(fā)送消息時使用相同的分區(qū)鍵來實現(xiàn)此目的。狀態(tài)管理部分詳細討論了連接流。

所有這一切都有一個警告:Samza 目前假設(shè)一個流的分區(qū)計數(shù)永遠不會改變。不支持分區(qū)拆分或重新分區(qū)。如果輸入流具有 N 個分區(qū),則預計它始終具有并且將始終具有N個分區(qū)。如果要重新分區(qū)流,則可以編寫從流中讀取消息的作業(yè),并將其寫入具有所需數(shù)量分區(qū)的新流。例如,您可以從 PageViewEvent 讀取消息,并將它們寫入 PageViewEventRepartition。

廣播流

0.10.0之后,Samza 支持廣播流。您可以通過附加哈希標記以及分區(qū)號或分區(qū)號范圍來將某些流中的分區(qū)分配給所有任務(wù)。例如,您希望所有任務(wù)可以從稱為廣播流-1的流中消耗分區(qū)0和1,并從稱為廣播流-2的流中分配2。您現(xiàn)在可以配置:

task.broadcast.inputs=yourSystem.broadcast-stream-1#[0-1], yourSystem.broadcast-stream-2#2

如果使用 “[]”,則指定分區(qū)的范圍。

流 ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號