在 Samza 中,容器是在一組機(jī)器上運(yùn)行的物理并行單元。每個(gè)容器本質(zhì)上是一個(gè)運(yùn)行一個(gè)或多個(gè)流任務(wù)的進(jìn)程。每個(gè)任務(wù)實(shí)例消耗輸入流的一個(gè)或多個(gè)分區(qū),并與其自己的持久數(shù)據(jù)存儲相關(guān)聯(lián)。
我們將一個(gè)狀態(tài) Samza 作業(yè)定義為在其實(shí)現(xiàn)中使用鍵值存儲的 Samza 作業(yè)以及關(guān)聯(lián)的 changelog 流。在有狀態(tài)的 samza 作業(yè)中,可以將任務(wù)配置為使用多個(gè)存儲。對于每個(gè)存儲,在任務(wù)實(shí)例和數(shù)據(jù)存儲之間存在1:1映射。由于向 Yarn 群集中的機(jī)器分配容器完全由 Yarn 分配,所以 Samza 不保證將容器(因此與其關(guān)聯(lián)的任務(wù))部署在同一臺機(jī)器上。容器可以在以下任何情況下進(jìn)行洗牌:
在任何上述情況下,任務(wù)的共用數(shù)據(jù)需要在每次容器啟動時(shí)恢復(fù)。每次恢復(fù)數(shù)據(jù)可能是昂貴的,特別是對于具有大數(shù)據(jù)集的應(yīng)用程序。這種行為會使工作的啟動時(shí)間減慢,使得該作業(yè)不再“接近實(shí)時(shí)”。此外,如果多個(gè)有狀態(tài)的 samza 作業(yè)在群集中同時(shí)重啟,并且共享相同的 changelog 系統(tǒng),則可以快速地使更改日志系統(tǒng)的網(wǎng)絡(luò)飽和并導(dǎo)致 DDoS。
例如,考慮執(zhí)行流表連接的 Samza 作業(yè)。通常,這樣的工作要求數(shù)據(jù)集在開始處理輸入流之前在所有處理器上可用。數(shù)據(jù)集通常是大型(order> 1TB)只讀數(shù)據(jù),將用于連接或添加屬性到傳入的消息。該作業(yè)可以通過直接從遠(yuǎn)程存儲或更改日志流的數(shù)據(jù)填充來初始化該緩存。每次重新啟動容器時(shí),都會發(fā)生此緩存初始化。這在工作啟動期間會導(dǎo)致嚴(yán)重的延遲。
那么解決方案就是簡單地將狀態(tài)存儲器保存在容器進(jìn)程正在執(zhí)行的機(jī)器上,并在每次重新啟動作業(yè)時(shí)為容器重新分配相同的主機(jī),以便重新使用持久化狀態(tài)。因此,Samza 在作業(yè)重新啟動時(shí)將容器分配到同一臺機(jī)器的能力稱為主機(jī)密切關(guān)系。Samza 利用主機(jī)關(guān)聯(lián)來增強(qiáng)我們對當(dāng)?shù)貒抑赜玫闹С帧?/p>
當(dāng) Yarn 中部署有狀態(tài)的 Samza 作業(yè)時(shí),任務(wù)的狀態(tài)存儲位于 Yarn 應(yīng)用嘗試的當(dāng)前工作目錄中。
container_working_dir=${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}/
# Data Stores
ls ${container_working_dir}/state/${store-name}/${task_name}/
這允許節(jié)點(diǎn)管理器(NM)DeletionService 在應(yīng)用程序完成或失敗后清理工作目錄。為了重新使用本地狀態(tài)存儲,狀態(tài)存儲需要保持在 NM 的刪除服務(wù)范圍之外。集群管理員應(yīng)將此位置設(shè)置為紗線中的環(huán)境變量 LOGGED_STORE_BASE_DIR。
每次任務(wù)提交時(shí),Samza 將最后一個(gè)實(shí)例化的偏移量從更改日志流中寫入磁盤上的檢查文件。這也是在容器關(guān)閉時(shí)完成的。因此,存在與每個(gè)狀態(tài)存儲的 changelog 分區(qū)相關(guān)聯(lián)的 OFFSET文件,其由容器中的任務(wù)消耗。
${LOGGED_STORE_BASE_DIR}/${job.name}-${job.id}/${store.name}/${task.name}/OFFSET
現(xiàn)在,在 OFFSET 文件存在之后,當(dāng)容器在同一臺機(jī)器上重新啟動時(shí),Samza 容器:
這大大降低了容器啟動時(shí)的狀態(tài)恢復(fù)時(shí)間,因?yàn)槲覀儾辉購娜罩玖鞯拈_始消耗。如果 OFFSET 文件不存在,它將創(chuàng)建狀態(tài)存儲并從更改日志中的最舊偏移量中消耗以重新創(chuàng)建狀態(tài)。由于 OFFSET 文件在刷新存儲之后每次提交時(shí)都會寫入,因此記錄的偏移量將保證與存儲的當(dāng)前內(nèi)容或某些較舊的內(nèi)容對應(yīng),但不會更新。這給予狀態(tài)恢復(fù)至少一次語義。因此,日志條目必須是冪等的。
有必要定期清理計(jì)算機(jī)上的未使用或孤立狀態(tài)存儲,以管理磁盤空間。此功能正在SAMZA-656中進(jìn)行。
為了重新使用本地狀態(tài),Samza 必須從資源管理器(RM)成功地聲明特定的主機(jī)。為了支持這一點(diǎn),Samza 容器在每次啟動成功時(shí)將其地點(diǎn)信息寫入協(xié)調(diào)器流?,F(xiàn)在,Samza應(yīng)用程序主(AM)可以通過作業(yè)協(xié)調(diào)器(JC)識別最后一個(gè)已知的容器主機(jī),并且應(yīng)用程序不再與容器位置無關(guān)。在容器故障(由于上述引用的任何原因)中,AM包括ResourceRequest中預(yù)期資源的主機(jī)名。
請注意,Yarn 群集必須配置為使用公平調(diào)度程序,并啟用連續(xù)調(diào)度。通過連續(xù)調(diào)度,調(diào)度程序不斷地遍歷集群中的所有節(jié)點(diǎn),而不是依賴于節(jié)點(diǎn)的心跳,并且在放寬局部性之前,根據(jù)每個(gè)節(jié)點(diǎn)的先前已知狀態(tài)來調(diào)度工作。因此,在配置的延遲之后,調(diào)度器處理輕松的位置。這種方法可以被認(rèn)為是“ 盡力而為的粘性 ”策略,因?yàn)樗埱蟮墓?jié)點(diǎn)可能沒有運(yùn)行或在請求時(shí)沒有足夠的資源(即使數(shù)據(jù)存儲中的狀態(tài)可能被持續(xù))。有關(guān)選擇Fair Scheduler的更多詳細(xì)信息,請參閱設(shè)計(jì)文檔。
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<description>The class to use as the resource scheduler.</description>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<property>
<name>yarn.scheduler.fair.continuous-scheduling-enabled</name>
<description>Enable Continuous Scheduling of Resource Requests</description>
<value>true</value>
</property>
<property>
<name>yarn.scheduler.fair.locality-delay-node-ms</name>
<description>Delay time in milliseconds before relaxing locality at node-level</description>
<value>1000</value> <!-- Should be tuned per requirement -->
</property>
<property>
<name>yarn.scheduler.fair.locality-delay-rack-ms</name>
<description>Delay time in milliseconds before relaxing locality at rack-level</description>
<value>1000</value> <!-- Should be tuned per requirement -->
</property>
<property>
<name>yarn.nodemanager.sleep-delay-before-sigkill.ms</name>
<description>No. of ms to wait between sending a SIGTERM and SIGKILL to a container</description>
<value>600000</value> <!-- Set it to 10min to allow enough time for clean shutdown of containers -->
</property>
任何有狀態(tài)的 Samza 工作都可以通過設(shè)置 yarn.samza.host-affinity.enabled 為 true 來利用此功能來減少其狀態(tài)存儲的平均恢復(fù)時(shí)間(MTTR)。
yarn.samza.host-affinity.enabled=true # Default: false
為無狀態(tài) Samza 作業(yè)啟用此功能不應(yīng)對作業(yè)產(chǎn)生任何不利影響。
正如您所觀察到的,由于 Yarn 群集中的 varibale 載入分布,主機(jī)關(guān)聯(lián)性無法一直保證。因此,這是 Samza 提供的盡力而為的政策。但是,某些情況值得一提的是,這些保證可能難以實(shí)現(xiàn)或不適用。
更多建議: