在 Samza 中,容器是在一組機(jī)器上運(yùn)行的物理并行單元。每個(gè)容器本質(zhì)上是一個(gè)運(yùn)行一個(gè)或多個(gè)流任務(wù)的進(jìn)程。每個(gè)任務(wù)實(shí)例消耗輸入流的一個(gè)或多個(gè)分區(qū),并與其自己的持久數(shù)據(jù)存儲(chǔ)相關(guān)聯(lián)。
我們將一個(gè)狀態(tài) Samza 作業(yè)定義為在其實(shí)現(xiàn)中使用鍵值存儲(chǔ)的 Samza 作業(yè)以及關(guān)聯(lián)的 changelog 流。在有狀態(tài)的 samza 作業(yè)中,可以將任務(wù)配置為使用多個(gè)存儲(chǔ)。對(duì)于每個(gè)存儲(chǔ),在任務(wù)實(shí)例和數(shù)據(jù)存儲(chǔ)之間存在1:1映射。由于向 Yarn 群集中的機(jī)器分配容器完全由 Yarn 分配,所以 Samza 不保證將容器(因此與其關(guān)聯(lián)的任務(wù))部署在同一臺(tái)機(jī)器上。容器可以在以下任何情況下進(jìn)行洗牌:
在任何上述情況下,任務(wù)的共用數(shù)據(jù)需要在每次容器啟動(dòng)時(shí)恢復(fù)。每次恢復(fù)數(shù)據(jù)可能是昂貴的,特別是對(duì)于具有大數(shù)據(jù)集的應(yīng)用程序。這種行為會(huì)使工作的啟動(dòng)時(shí)間減慢,使得該作業(yè)不再“接近實(shí)時(shí)”。此外,如果多個(gè)有狀態(tài)的 samza 作業(yè)在群集中同時(shí)重啟,并且共享相同的 changelog 系統(tǒng),則可以快速地使更改日志系統(tǒng)的網(wǎng)絡(luò)飽和并導(dǎo)致 DDoS。
例如,考慮執(zhí)行流表連接的 Samza 作業(yè)。通常,這樣的工作要求數(shù)據(jù)集在開(kāi)始處理輸入流之前在所有處理器上可用。數(shù)據(jù)集通常是大型(order> 1TB)只讀數(shù)據(jù),將用于連接或添加屬性到傳入的消息。該作業(yè)可以通過(guò)直接從遠(yuǎn)程存儲(chǔ)或更改日志流的數(shù)據(jù)填充來(lái)初始化該緩存。每次重新啟動(dòng)容器時(shí),都會(huì)發(fā)生此緩存初始化。這在工作啟動(dòng)期間會(huì)導(dǎo)致嚴(yán)重的延遲。
那么解決方案就是簡(jiǎn)單地將狀態(tài)存儲(chǔ)器保存在容器進(jìn)程正在執(zhí)行的機(jī)器上,并在每次重新啟動(dòng)作業(yè)時(shí)為容器重新分配相同的主機(jī),以便重新使用持久化狀態(tài)。因此,Samza 在作業(yè)重新啟動(dòng)時(shí)將容器分配到同一臺(tái)機(jī)器的能力稱為主機(jī)密切關(guān)系。Samza 利用主機(jī)關(guān)聯(lián)來(lái)增強(qiáng)我們對(duì)當(dāng)?shù)貒?guó)家重用的支持。
當(dāng) Yarn 中部署有狀態(tài)的 Samza 作業(yè)時(shí),任務(wù)的狀態(tài)存儲(chǔ)位于 Yarn 應(yīng)用嘗試的當(dāng)前工作目錄中。
container_working_dir=${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}/
# Data Stores
ls ${container_working_dir}/state/${store-name}/${task_name}/
這允許節(jié)點(diǎn)管理器(NM)DeletionService 在應(yīng)用程序完成或失敗后清理工作目錄。為了重新使用本地狀態(tài)存儲(chǔ),狀態(tài)存儲(chǔ)需要保持在 NM 的刪除服務(wù)范圍之外。集群管理員應(yīng)將此位置設(shè)置為紗線中的環(huán)境變量 LOGGED_STORE_BASE_DIR。
每次任務(wù)提交時(shí),Samza 將最后一個(gè)實(shí)例化的偏移量從更改日志流中寫(xiě)入磁盤(pán)上的檢查文件。這也是在容器關(guān)閉時(shí)完成的。因此,存在與每個(gè)狀態(tài)存儲(chǔ)的 changelog 分區(qū)相關(guān)聯(lián)的 OFFSET文件,其由容器中的任務(wù)消耗。
${LOGGED_STORE_BASE_DIR}/${job.name}-${job.id}/${store.name}/${task.name}/OFFSET
現(xiàn)在,在 OFFSET 文件存在之后,當(dāng)容器在同一臺(tái)機(jī)器上重新啟動(dòng)時(shí),Samza 容器:
這大大降低了容器啟動(dòng)時(shí)的狀態(tài)恢復(fù)時(shí)間,因?yàn)槲覀儾辉購(gòu)娜罩玖鞯拈_(kāi)始消耗。如果 OFFSET 文件不存在,它將創(chuàng)建狀態(tài)存儲(chǔ)并從更改日志中的最舊偏移量中消耗以重新創(chuàng)建狀態(tài)。由于 OFFSET 文件在刷新存儲(chǔ)之后每次提交時(shí)都會(huì)寫(xiě)入,因此記錄的偏移量將保證與存儲(chǔ)的當(dāng)前內(nèi)容或某些較舊的內(nèi)容對(duì)應(yīng),但不會(huì)更新。這給予狀態(tài)恢復(fù)至少一次語(yǔ)義。因此,日志條目必須是冪等的。
有必要定期清理計(jì)算機(jī)上的未使用或孤立狀態(tài)存儲(chǔ),以管理磁盤(pán)空間。此功能正在SAMZA-656中進(jìn)行。
為了重新使用本地狀態(tài),Samza 必須從資源管理器(RM)成功地聲明特定的主機(jī)。為了支持這一點(diǎn),Samza 容器在每次啟動(dòng)成功時(shí)將其地點(diǎn)信息寫(xiě)入協(xié)調(diào)器流?,F(xiàn)在,Samza應(yīng)用程序主(AM)可以通過(guò)作業(yè)協(xié)調(diào)器(JC)識(shí)別最后一個(gè)已知的容器主機(jī),并且應(yīng)用程序不再與容器位置無(wú)關(guān)。在容器故障(由于上述引用的任何原因)中,AM包括ResourceRequest中預(yù)期資源的主機(jī)名。
請(qǐng)注意,Yarn 群集必須配置為使用公平調(diào)度程序,并啟用連續(xù)調(diào)度。通過(guò)連續(xù)調(diào)度,調(diào)度程序不斷地遍歷集群中的所有節(jié)點(diǎn),而不是依賴于節(jié)點(diǎn)的心跳,并且在放寬局部性之前,根據(jù)每個(gè)節(jié)點(diǎn)的先前已知狀態(tài)來(lái)調(diào)度工作。因此,在配置的延遲之后,調(diào)度器處理輕松的位置。這種方法可以被認(rèn)為是“ 盡力而為的粘性 ”策略,因?yàn)樗?qǐng)求的節(jié)點(diǎn)可能沒(méi)有運(yùn)行或在請(qǐng)求時(shí)沒(méi)有足夠的資源(即使數(shù)據(jù)存儲(chǔ)中的狀態(tài)可能被持續(xù))。有關(guān)選擇Fair Scheduler的更多詳細(xì)信息,請(qǐng)參閱設(shè)計(jì)文檔。
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<description>The class to use as the resource scheduler.</description>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<property>
<name>yarn.scheduler.fair.continuous-scheduling-enabled</name>
<description>Enable Continuous Scheduling of Resource Requests</description>
<value>true</value>
</property>
<property>
<name>yarn.scheduler.fair.locality-delay-node-ms</name>
<description>Delay time in milliseconds before relaxing locality at node-level</description>
<value>1000</value> <!-- Should be tuned per requirement -->
</property>
<property>
<name>yarn.scheduler.fair.locality-delay-rack-ms</name>
<description>Delay time in milliseconds before relaxing locality at rack-level</description>
<value>1000</value> <!-- Should be tuned per requirement -->
</property>
<property>
<name>yarn.nodemanager.sleep-delay-before-sigkill.ms</name>
<description>No. of ms to wait between sending a SIGTERM and SIGKILL to a container</description>
<value>600000</value> <!-- Set it to 10min to allow enough time for clean shutdown of containers -->
</property>
任何有狀態(tài)的 Samza 工作都可以通過(guò)設(shè)置 yarn.samza.host-affinity.enabled 為 true 來(lái)利用此功能來(lái)減少其狀態(tài)存儲(chǔ)的平均恢復(fù)時(shí)間(MTTR)。
yarn.samza.host-affinity.enabled=true # Default: false
為無(wú)狀態(tài) Samza 作業(yè)啟用此功能不應(yīng)對(duì)作業(yè)產(chǎn)生任何不利影響。
正如您所觀察到的,由于 Yarn 群集中的 varibale 載入分布,主機(jī)關(guān)聯(lián)性無(wú)法一直保證。因此,這是 Samza 提供的盡力而為的政策。但是,某些情況值得一提的是,這些保證可能難以實(shí)現(xiàn)或不適用。
更多建議: