Samza 協(xié)調(diào)器流

2018-08-22 17:48 更新

Samza 的工作完全由工作配置驅(qū)動(dòng)。因此,工作配置往往相當(dāng)大。為了輕松地序列化這樣大的配置并將其持續(xù)執(zhí)行,Samza 在提交作業(yè)時(shí)將所有配置寫入一個(gè)稱為協(xié)調(diào)器流的持久流。

協(xié)調(diào)器流是配置被寫入的單分區(qū)流。它具有與 Samza 中可配置的任何輸入流相同的特征 - 有序,可重放和容錯(cuò)。流將包含三種主要類型的消息:

  1. 作業(yè)配置消息
  2. 任務(wù) changelog 分區(qū)分配消息
  3. 集裝箱地點(diǎn)信息

協(xié)調(diào)器流命名

命名約定與創(chuàng)建的檢查點(diǎn)主題非常相似。

"__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))

協(xié)調(diào)器流消息模型

協(xié)調(diào)器流消息被建模為鍵/值對(duì)。關(guān)鍵是明確定義的字段列表:版本,類型和密鑰。該值是一張地圖。值映射有一些預(yù)定義字段(如時(shí)間戳,主機(jī)等),這些字段對(duì)所有消息都是通用的。

CoordinatorStreamMessage 的完整結(jié)構(gòu)是:

key => ["<version-number>", "<message-type>", "<key>"]

message => {
    "host": "<hostname>",
    "username": "<username>",
    "source": "<source-for-this-message>",
    "timestamp": <timestamp-of-the-message>,
    "values": { }
}

消息本質(zhì)上是串行化的,并通過電線作為 JSON Blob 發(fā)送。因此,為了使序列化正常工作,沒有任何不必要的空格是非常重要的。上述 JSON blob 中的空白區(qū)域僅顯示為可讀性。

最重要的字段是類型,鍵值和值:

  • type - 定義消息的種類
  • 鍵 - 定義與值相關(guān)聯(lián)的鍵
  • 值映射 - 基于每個(gè)消息類型定義,并定義與該類型相關(guān)聯(lián)的一組值

當(dāng)前支持的協(xié)調(diào)器流消息如下所示:

信息類型價(jià)值觀地圖
配置消息
(適用于所有構(gòu)型
中列出的選項(xiàng)的配置
設(shè)置配置<配置名稱>'value'=> <config-value>
任務(wù) - 更改日志分配消息設(shè)置更新日志TaskName >'partition'=> <Changelog-Partition-Id>
集裝箱地點(diǎn)信息設(shè)置容器主機(jī)分配<容器-ID>'hostname'=> <HostName>

協(xié)調(diào)器流寫入器

Samza 提供了一個(gè)命令行工具,將 Job Configuration 消息寫入?yún)f(xié)調(diào)器流。該工具可以使用如下:

samza-example/target/bin/run-coordinator-stream-writer.sh \
  --config-path=file:///path/to/job/config.properties \
  --type set-config \
  --key job.container.count \
  --value 8

工作協(xié)調(diào)器

作業(yè)協(xié)調(diào)器每次在作業(yè)啟動(dòng)時(shí)從協(xié)調(diào)器流引導(dǎo)配置。它會(huì)定期跟蹤寫入?yún)f(xié)調(diào)器流的任何新數(shù)據(jù),并更新作業(yè)模型。

作業(yè)模型是用于表示 Samza 作業(yè)的數(shù)據(jù)模型,它還包含作業(yè)配置。Samza 作業(yè)的層次結(jié)構(gòu)具有容器,每個(gè)容器都有任務(wù),封裝在作業(yè)模型中,以及相關(guān)信息,如容器ID,任務(wù)名稱,分區(qū)信息等。

作業(yè)協(xié)調(diào)器通過 HTTP 服務(wù)公開作業(yè)模型和作業(yè)配置。作業(yè)協(xié)調(diào)器的 HTTP 服務(wù)的 URL 作為環(huán)境變量作為容器啟動(dòng)時(shí)傳遞給 Samza Containers。容器可以編寫元信息,例如 locality - 容器運(yùn)行的機(jī)器的主機(jī)名。但是,他們將通過HTTP服務(wù)查詢作業(yè)協(xié)調(diào)器來讀取作業(yè)模型和配置。

因此,Job Coorindator 是具有整個(gè)工作狀態(tài)的最新視圖的單一組件。這是非常有用的,因?yàn)樗试S我們?cè)趯頂U(kuò)展作業(yè)協(xié)調(diào)器的功能來管理作業(yè)的生命周期(例如啟動(dòng)/停止容器,修改任務(wù)分配等)。

工作協(xié)調(diào)器可用性

作業(yè)協(xié)調(diào)器駐留在與 Samza 應(yīng)用程序主機(jī)相同的容器中。因此,作業(yè)協(xié)調(diào)器的可用性與紗線群集中應(yīng)用程序主機(jī)(AM)的可用性有關(guān)。只有在協(xié)調(diào)器流初始化工作協(xié)調(diào)器之后才能啟動(dòng)Samza 容器。在穩(wěn)定狀態(tài)下,當(dāng) Samza 容器出現(xiàn)時(shí),應(yīng)該能夠從 Job Coordinator 中讀取 JobModel,而不會(huì)超時(shí)。

協(xié)調(diào)器流模型的優(yōu)點(diǎn)

將配置寫入持久的流程為 Samza 打開了大門:

  1. 刪除作業(yè)配置上的大小限制
  2. 使用標(biāo)準(zhǔn)數(shù)據(jù)模型和通信接口向工作容器提供與作業(yè)相關(guān)的配置和元數(shù)據(jù)(有關(guān)詳細(xì)信息,請(qǐng)參閱作業(yè)協(xié)調(diào)器
  3. 某些配置只能設(shè)置一次。在將來的部署中更改它們將重置整個(gè)作業(yè)的狀態(tài),因?yàn)樗赡軙?huì)將輸入分區(qū)重新洗牌到容器。例如,在有狀態(tài)的Samza作業(yè)上更改SystemStreamPartitionGrouper會(huì)將單個(gè)更改日志分區(qū)中的不同StreamTask的狀態(tài)混合起來。沒有持續(xù)配置,沒有簡(jiǎn)單的方法來檢查作業(yè)的當(dāng)前配置是否有效。
  4. 可以通過寫入?yún)f(xié)調(diào)器流動(dòng)態(tài)更改作業(yè)配置。這可以使需要作業(yè)的功能與配置更改(例如主機(jī)相關(guān)性,自動(dòng)縮放,動(dòng)態(tài)重新配置等)無關(guān)。
  5. 提供統(tǒng)一的工作狀態(tài)視圖,使Samza能夠更強(qiáng)大的控制容器控制的方式(有關(guān)詳細(xì)信息,請(qǐng)參閱作業(yè)協(xié)調(diào)器
  6. 使得 Job Coordinator 未來的設(shè)計(jì)失敗,因?yàn)樗鳛楫?dāng)前工作狀態(tài)的真實(shí)來源

對(duì)于可以利用此型號(hào)的其他有趣功能,請(qǐng)參考設(shè)計(jì)文檔。

事件循環(huán) ?

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)