Samza 主機(jī)關(guān)聯(lián)和 YARN

2018-08-22 18:18 更新

在 Samza 中,容器是在一組機(jī)器上運(yùn)行的物理并行單元。每個(gè)容器本質(zhì)上是一個(gè)運(yùn)行一個(gè)或多個(gè)流任務(wù)的進(jìn)程。每個(gè)任務(wù)實(shí)例消耗輸入流的一個(gè)或多個(gè)分區(qū),并與其自己的持久數(shù)據(jù)存儲相關(guān)聯(lián)。

我們將一個(gè)狀態(tài) Samza 作業(yè)定義為在其實(shí)現(xiàn)中使用鍵值存儲的 Samza 作業(yè)以及關(guān)聯(lián)的 changelog 流。在有狀態(tài)的 samza 作業(yè)中,可以將任務(wù)配置為使用多個(gè)存儲。對于每個(gè)存儲,在任務(wù)實(shí)例和數(shù)據(jù)存儲之間存在1:1映射。由于向 Yarn 群集中的機(jī)器分配容器完全由 Yarn 分配,所以 Samza 不保證將容器(因此與其關(guān)聯(lián)的任務(wù))部署在同一臺機(jī)器上。容器可以在以下任何情況下進(jìn)行洗牌:

  1. 當(dāng)通過指向 yarn.package.path 新的包路徑升級作業(yè)并重新提交。
  2. 當(dāng)工作只是由 Yarn 或用戶重新啟動時(shí)。
  3. 容器故障或提前引發(fā) SamzaAppMaster 重新分配到另一個(gè)可用資源時(shí)。

在任何上述情況下,任務(wù)的共用數(shù)據(jù)需要在每次容器啟動時(shí)恢復(fù)。每次恢復(fù)數(shù)據(jù)可能是昂貴的,特別是對于具有大數(shù)據(jù)集的應(yīng)用程序。這種行為會使工作的啟動時(shí)間減慢,使得該作業(yè)不再“接近實(shí)時(shí)”。此外,如果多個(gè)有狀態(tài)的 samza 作業(yè)在群集中同時(shí)重啟,并且共享相同的 changelog 系統(tǒng),則可以快速地使更改日志系統(tǒng)的網(wǎng)絡(luò)飽和并導(dǎo)致 DDoS。

例如,考慮執(zhí)行流表連接的 Samza 作業(yè)。通常,這樣的工作要求數(shù)據(jù)集在開始處理輸入流之前在所有處理器上可用。數(shù)據(jù)集通常是大型(order> 1TB)只讀數(shù)據(jù),將用于連接或添加屬性到傳入的消息。該作業(yè)可以通過直接從遠(yuǎn)程存儲或更改日志流的數(shù)據(jù)填充來初始化該緩存。每次重新啟動容器時(shí),都會發(fā)生此緩存初始化。這在工作啟動期間會導(dǎo)致嚴(yán)重的延遲。

那么解決方案就是簡單地將狀態(tài)存儲器保存在容器進(jìn)程正在執(zhí)行的機(jī)器上,并在每次重新啟動作業(yè)時(shí)為容器重新分配相同的主機(jī),以便重新使用持久化狀態(tài)。因此,Samza 在作業(yè)重新啟動時(shí)將容器分配到同一臺機(jī)器的能力稱為主機(jī)密切關(guān)系。Samza 利用主機(jī)關(guān)聯(lián)來增強(qiáng)我們對當(dāng)?shù)貒抑赜玫闹С帧?/p>

它是如何工作的?

當(dāng) Yarn 中部署有狀態(tài)的 Samza 作業(yè)時(shí),任務(wù)的狀態(tài)存儲位于 Yarn 應(yīng)用嘗試的當(dāng)前工作目錄中。

container_working_dir=${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}/

# Data Stores
ls ${container_working_dir}/state/${store-name}/${task_name}/

這允許節(jié)點(diǎn)管理器(NM)DeletionService 在應(yīng)用程序完成或失敗后清理工作目錄。為了重新使用本地狀態(tài)存儲,狀態(tài)存儲需要保持在 NM 的刪除服務(wù)范圍之外。集群管理員應(yīng)將此位置設(shè)置為紗線中的環(huán)境變量 LOGGED_STORE_BASE_DIR。


每次任務(wù)提交時(shí),Samza 將最后一個(gè)實(shí)例化的偏移量從更改日志流中寫入磁盤上的檢查文件。這也是在容器關(guān)閉時(shí)完成的。因此,存在與每個(gè)狀態(tài)存儲的 changelog 分區(qū)相關(guān)聯(lián)的 OFFSET文件,其由容器中的任務(wù)消耗。

${LOGGED_STORE_BASE_DIR}/${job.name}-${job.id}/${store.name}/${task.name}/OFFSET

現(xiàn)在,在 OFFSET 文件存在之后,當(dāng)容器在同一臺機(jī)器上重新啟動時(shí),Samza 容器:

  1. 打開磁盤上的持久存儲
  2. 讀取 OFFSET 文件
  3. 從 OFFSET 值恢復(fù)狀態(tài)存儲

這大大降低了容器啟動時(shí)的狀態(tài)恢復(fù)時(shí)間,因?yàn)槲覀儾辉購娜罩玖鞯拈_始消耗。如果 OFFSET 文件不存在,它將創(chuàng)建狀態(tài)存儲并從更改日志中的最舊偏移量中消耗以重新創(chuàng)建狀態(tài)。由于 OFFSET 文件在刷新存儲之后每次提交時(shí)都會寫入,因此記錄的偏移量將保證與存儲的當(dāng)前內(nèi)容或某些較舊的內(nèi)容對應(yīng),但不會更新。這給予狀態(tài)恢復(fù)至少一次語義。因此,日志條目必須是冪等的。

有必要定期清理計(jì)算機(jī)上的未使用或孤立狀態(tài)存儲,以管理磁盤空間。此功能正在SAMZA-656中進(jìn)行

為了重新使用本地狀態(tài),Samza 必須從資源管理器(RM)成功地聲明特定的主機(jī)。為了支持這一點(diǎn),Samza 容器在每次啟動成功時(shí)將其地點(diǎn)信息寫入協(xié)調(diào)器流?,F(xiàn)在,Samza應(yīng)用程序主(AM)可以通過作業(yè)協(xié)調(diào)器(JC)識別最后一個(gè)已知的容器主機(jī),并且應(yīng)用程序不再與容器位置無關(guān)。在容器故障(由于上述引用的任何原因)中,AM包括ResourceRequest中預(yù)期資源的主機(jī)名。

請注意,Yarn 群集必須配置為使用公平調(diào)度程序,并啟用連續(xù)調(diào)度。通過連續(xù)調(diào)度,調(diào)度程序不斷地遍歷集群中的所有節(jié)點(diǎn),而不是依賴于節(jié)點(diǎn)的心跳,并且在放寬局部性之前,根據(jù)每個(gè)節(jié)點(diǎn)的先前已知狀態(tài)來調(diào)度工作。因此,在配置的延遲之后,調(diào)度器處理輕松的位置。這種方法可以被認(rèn)為是“ 盡力而為的粘性 ”策略,因?yàn)樗埱蟮墓?jié)點(diǎn)可能沒有運(yùn)行或在請求時(shí)沒有足夠的資源(即使數(shù)據(jù)存儲中的狀態(tài)可能被持續(xù))。有關(guān)選擇Fair Scheduler的更多詳細(xì)信息,請參閱設(shè)計(jì)文檔。

配置YARN群集以支持主機(jī)關(guān)聯(lián)性

  1. 通過設(shè)置 LOGGED_STORE_BASE_DIR 在 yarn-env.sh 中的環(huán)境變量來啟用本地狀態(tài)重新使用 export LOGGEDSTOREBASE_DIR=<path-for-state-stores> 。如果沒有這種配置,狀態(tài)存儲不會在容器關(guān)閉時(shí)持久存在。這將有效地意味著您不會重新使用本地狀態(tài),因此主機(jī)關(guān)聯(lián)成為一個(gè)模擬操作。
  2. 使用公平調(diào)度器配置 Yarn,并在 yarn-site.xml 中啟用連續(xù)調(diào)度
    <property>
    <name>yarn.resourcemanager.scheduler.class</name>
    <description>The class to use as the resource scheduler.</description>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
    </property>
    <property>
    <name>yarn.scheduler.fair.continuous-scheduling-enabled</name>
    <description>Enable Continuous Scheduling of Resource Requests</description>
    <value>true</value>
    </property>
    <property>
    <name>yarn.scheduler.fair.locality-delay-node-ms</name>
    <description>Delay time in milliseconds before relaxing locality at node-level</description>
    <value>1000</value>  <!-- Should be tuned per requirement -->
    </property>
    <property>
    <name>yarn.scheduler.fair.locality-delay-rack-ms</name>
    <description>Delay time in milliseconds before relaxing locality at rack-level</description>
    <value>1000</value> <!-- Should be tuned per requirement -->
    </property>
  3. 將紗線節(jié)點(diǎn)管理器SIGTERM配置為SIGKILL超時(shí)為合理時(shí)間節(jié)點(diǎn)管理器將給予Samza Container足夠的時(shí)間在紗線站點(diǎn).xml中執(zhí)行干凈的關(guān)機(jī)
    <property>
    <name>yarn.nodemanager.sleep-delay-before-sigkill.ms</name>
    <description>No. of ms to wait between sending a SIGTERM and SIGKILL to a container</description>
    <value>600000</value> <!-- Set it to 10min to allow enough time for clean shutdown of containers -->
    </property>
  4. Yarn 機(jī)架識別功能不是必需的,不會改變 Samza 主機(jī)關(guān)聯(lián)的行為。但是,如果在集群中配置了機(jī)架識別,請確保 DNSToSwitchMapping 實(shí)現(xiàn)非常穩(wěn)健。任何故障都可能導(dǎo)致容器請求返回到 defaultRack。這將導(dǎo)致 ContainerRequests 與首選主機(jī)不匹配,這將降低主機(jī)關(guān)聯(lián)性。

配置 Samza 作業(yè)以使用Host Affinity

任何有狀態(tài)的 Samza 工作都可以通過設(shè)置 yarn.samza.host-affinity.enabled 為 true 來利用此功能來減少其狀態(tài)存儲的平均恢復(fù)時(shí)間(MTTR)。

yarn.samza.host-affinity.enabled=true  # Default: false

為無狀態(tài) Samza 作業(yè)啟用此功能不應(yīng)對作業(yè)產(chǎn)生任何不利影響。

主機(jī)關(guān)聯(lián)保證

正如您所觀察到的,由于 Yarn 群集中的 varibale 載入分布,主機(jī)關(guān)聯(lián)性無法一直保證。因此,這是 Samza 提供的盡力而為的政策。但是,某些情況值得一提的是,這些保證可能難以實(shí)現(xiàn)或不適用。

  1. 當(dāng)容器數(shù)量或容器任務(wù)分配在連續(xù)的應(yīng)用程序運(yùn)行時(shí)發(fā)生更改時(shí) - 我們可能能夠?yàn)榉謪^(qū)子集重新使用本地狀態(tài)。目前,作業(yè)協(xié)調(diào)員沒有任何邏輯來智能地處理容器中任務(wù)的分區(qū)。與容器的自動縮放相關(guān)的操作更多涉及。但是,通過任務(wù)容器映射,對于典型的容器計(jì)數(shù)調(diào)整,這將更有效。
  2. 當(dāng) SystemStreamPartitionGrouper 在連續(xù)的應(yīng)用程序運(yùn)行中發(fā)生變化時(shí) - 當(dāng)用于在容器之間分配分區(qū)的分組器邏輯發(fā)生變化時(shí),協(xié)調(diào)器流中的數(shù)據(jù)(對于 changelog-任務(wù)分區(qū)分配等)和數(shù)據(jù)存儲變?yōu)闊o效。因此,為了安全起見,我們應(yīng)該從協(xié)調(diào)器流中清除所有與狀態(tài)有關(guān)的數(shù)據(jù)。另一種方法是在重新啟動作業(yè)之前,在協(xié)調(diào)器流中覆蓋 Task-ChangelogPartition 分配消息和容器位置消息。

資源本地化  ?


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號