Samza 協(xié)調(diào)器流

2018-08-22 17:48 更新

Samza 的工作完全由工作配置驅(qū)動。因此,工作配置往往相當大。為了輕松地序列化這樣大的配置并將其持續(xù)執(zhí)行,Samza 在提交作業(yè)時將所有配置寫入一個稱為協(xié)調(diào)器流的持久流。

協(xié)調(diào)器流是配置被寫入的單分區(qū)流。它具有與 Samza 中可配置的任何輸入流相同的特征 - 有序,可重放和容錯。流將包含三種主要類型的消息:

  1. 作業(yè)配置消息
  2. 任務(wù) changelog 分區(qū)分配消息
  3. 集裝箱地點信息

協(xié)調(diào)器流命名

命名約定與創(chuàng)建的檢查點主題非常相似。

"__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))

協(xié)調(diào)器流消息模型

協(xié)調(diào)器流消息被建模為鍵/值對。關(guān)鍵是明確定義的字段列表:版本,類型和密鑰。該值是一張地圖。值映射有一些預(yù)定義字段(如時間戳,主機等),這些字段對所有消息都是通用的。

CoordinatorStreamMessage 的完整結(jié)構(gòu)是:

key => ["<version-number>", "<message-type>", "<key>"]

message => {
    "host": "<hostname>",
    "username": "<username>",
    "source": "<source-for-this-message>",
    "timestamp": <timestamp-of-the-message>,
    "values": { }
}

消息本質(zhì)上是串行化的,并通過電線作為 JSON Blob 發(fā)送。因此,為了使序列化正常工作,沒有任何不必要的空格是非常重要的。上述 JSON blob 中的空白區(qū)域僅顯示為可讀性。

最重要的字段是類型,鍵值和值:

  • type - 定義消息的種類
  • 鍵 - 定義與值相關(guān)聯(lián)的鍵
  • 值映射 - 基于每個消息類型定義,并定義與該類型相關(guān)聯(lián)的一組值

當前支持的協(xié)調(diào)器流消息如下所示:

信息類型價值觀地圖
配置消息
(適用于所有構(gòu)型
中列出的選項的配置
設(shè)置配置<配置名稱>'value'=> <config-value>
任務(wù) - 更改日志分配消息設(shè)置更新日志TaskName >'partition'=> <Changelog-Partition-Id>
集裝箱地點信息設(shè)置容器主機分配<容器-ID>'hostname'=> <HostName>

協(xié)調(diào)器流寫入器

Samza 提供了一個命令行工具,將 Job Configuration 消息寫入?yún)f(xié)調(diào)器流。該工具可以使用如下:

samza-example/target/bin/run-coordinator-stream-writer.sh \
  --config-path=file:///path/to/job/config.properties \
  --type set-config \
  --key job.container.count \
  --value 8

工作協(xié)調(diào)器

作業(yè)協(xié)調(diào)器每次在作業(yè)啟動時從協(xié)調(diào)器流引導(dǎo)配置。它會定期跟蹤寫入?yún)f(xié)調(diào)器流的任何新數(shù)據(jù),并更新作業(yè)模型。

作業(yè)模型是用于表示 Samza 作業(yè)的數(shù)據(jù)模型,它還包含作業(yè)配置。Samza 作業(yè)的層次結(jié)構(gòu)具有容器,每個容器都有任務(wù),封裝在作業(yè)模型中,以及相關(guān)信息,如容器ID,任務(wù)名稱,分區(qū)信息等。

作業(yè)協(xié)調(diào)器通過 HTTP 服務(wù)公開作業(yè)模型和作業(yè)配置。作業(yè)協(xié)調(diào)器的 HTTP 服務(wù)的 URL 作為環(huán)境變量作為容器啟動時傳遞給 Samza Containers。容器可以編寫元信息,例如 locality - 容器運行的機器的主機名。但是,他們將通過HTTP服務(wù)查詢作業(yè)協(xié)調(diào)器來讀取作業(yè)模型和配置。

因此,Job Coorindator 是具有整個工作狀態(tài)的最新視圖的單一組件。這是非常有用的,因為它允許我們在將來擴展作業(yè)協(xié)調(diào)器的功能來管理作業(yè)的生命周期(例如啟動/停止容器,修改任務(wù)分配等)。

工作協(xié)調(diào)器可用性

作業(yè)協(xié)調(diào)器駐留在與 Samza 應(yīng)用程序主機相同的容器中。因此,作業(yè)協(xié)調(diào)器的可用性與紗線群集中應(yīng)用程序主機(AM)的可用性有關(guān)。只有在協(xié)調(diào)器流初始化工作協(xié)調(diào)器之后才能啟動Samza 容器。在穩(wěn)定狀態(tài)下,當 Samza 容器出現(xiàn)時,應(yīng)該能夠從 Job Coordinator 中讀取 JobModel,而不會超時。

協(xié)調(diào)器流模型的優(yōu)點

將配置寫入持久的流程為 Samza 打開了大門:

  1. 刪除作業(yè)配置上的大小限制
  2. 使用標準數(shù)據(jù)模型和通信接口向工作容器提供與作業(yè)相關(guān)的配置和元數(shù)據(jù)(有關(guān)詳細信息,請參閱作業(yè)協(xié)調(diào)器
  3. 某些配置只能設(shè)置一次。在將來的部署中更改它們將重置整個作業(yè)的狀態(tài),因為它可能會將輸入分區(qū)重新洗牌到容器。例如,在有狀態(tài)的Samza作業(yè)上更改SystemStreamPartitionGrouper會將單個更改日志分區(qū)中的不同StreamTask的狀態(tài)混合起來。沒有持續(xù)配置,沒有簡單的方法來檢查作業(yè)的當前配置是否有效。
  4. 可以通過寫入?yún)f(xié)調(diào)器流動態(tài)更改作業(yè)配置。這可以使需要作業(yè)的功能與配置更改(例如主機相關(guān)性,自動縮放,動態(tài)重新配置等)無關(guān)。
  5. 提供統(tǒng)一的工作狀態(tài)視圖,使Samza能夠更強大的控制容器控制的方式(有關(guān)詳細信息,請參閱作業(yè)協(xié)調(diào)器
  6. 使得 Job Coordinator 未來的設(shè)計失敗,因為它作為當前工作狀態(tài)的真實來源

對于可以利用此型號的其他有趣功能,請參考設(shè)計文檔。

事件循環(huán) ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號