Samza附錄十一 Samza配置參考

2018-08-23 15:08 更新

下表列出了可以包含在Samza作業(yè)配置文件中的所有標(biāo)準(zhǔn)屬性。

下表中加粗顯示的 system-name 是您自己的變量名的占位符。

名稱默認(rèn)描述
Samza工作配置
job.factory.class必需:用于運(yùn)行此作業(yè)作業(yè)工廠該值是一個(gè)完全限定的Java類名,必須實(shí)現(xiàn) StreamJobFactory。Samza 有三個(gè)實(shí)現(xiàn):
org.apache.samza.job.local.ThreadJobFactory
使用線程在您的本地機(jī)器上運(yùn)行您的工作。這僅適用于開發(fā),而不適用于生產(chǎn)部署。
org.apache.samza.job.local.ProcessJobFactory
在您的本地機(jī)器上運(yùn)行作業(yè)作為子進(jìn)程。還可以指定可選的命令生成器屬性。這僅適用于開發(fā),而不適用于生產(chǎn)部署。
org.apache.samza.job.yarn.YarnJobFactory
在YARN網(wǎng)格上運(yùn)行您的工作。有關(guān)YARN特定配置,請參見下文。
job.name必需:您的工作的名稱。該名稱出現(xiàn)在 Samza 儀表板上,用于從其他工作的檢查點(diǎn)分離此工作的檢查點(diǎn)。
job.id1如果你同時(shí)運(yùn)行幾個(gè)你的工作實(shí)例,你需要給每個(gè)執(zhí)行一個(gè)不同的job.id這很重要,否則這些工作將覆蓋彼此的檢查點(diǎn),也可能以其他方式相互干擾。
job.coordinator.system必需:用于創(chuàng)建和維護(hù)協(xié)調(diào)器流 system-name。 
job.default.system system-name 訪問系統(tǒng)沒有明確配置用于其中的任何輸入或輸出流。此屬性用于輸入和輸出流,而job.coordinator.system 適用于 samza 元數(shù)據(jù)流。
job.coordinator.replication.factor
3如果您使用 Kafka 進(jìn)行協(xié)調(diào)器流,那么您希望協(xié)調(diào)器主題復(fù)制其持久性的 Kafka 節(jié)點(diǎn)的數(shù)量。
job.coordinator.segment.bytes
26214400如果您使用 Kafka 系統(tǒng)進(jìn)行協(xié)調(diào)器流,則這是用于協(xié)調(diào)器主題日志段的段大小。保持這個(gè)數(shù)字很小是有用的,因?yàn)樗黾恿薑afka 垃圾收集舊消息的頻率。
job.coordinator.monitor-partition-change
如果您使用 Kafka 進(jìn)行協(xié)調(diào)器流,則此配置可使作業(yè)協(xié)調(diào)器檢測 Kafka 輸入主題中的分區(qū)計(jì)數(shù)差異。檢測時(shí),它會更新system-name 格式的 Gauge 指標(biāo)。stream-name .partitionCount,表示分區(qū)計(jì)數(shù)與初始狀態(tài)的差異。請注意,目前此功能僅適用于基于卡夫卡的系統(tǒng)。
job.coordinator.monitor-partition-change.frequency.ms
300000應(yīng)檢測輸入流分區(qū)計(jì)數(shù)變化的頻率。由于分區(qū)增加不是常見事件,所以可以將此檢查調(diào)整為相當(dāng)?shù)偷摹?/font>
job.config.rewriter.rewriter-name.class
您可以選擇定義配置重寫器,有可能在作業(yè)啟動前動態(tài)修改作業(yè)配置。例如,這可以用于從外部配置管理系統(tǒng)中拉取配置,或者用于在運(yùn)行時(shí)動態(tài)地確定輸入流集合。該屬性的值是一個(gè)完全限定的Java類名,它必須實(shí)現(xiàn) ConfigRewriter。默認(rèn)情況下,Samza與這些重寫器一起發(fā)貨:
org.apache.samza.config.RegExTopicGenerator
從Kafka消費(fèi)時(shí),可以使用符合某些正則表達(dá)式的所有Kafka主題(而不是明確列出每個(gè)主題)。此重寫器具有其他配置
org.apache.samza.config.EnvironmentConfigRewriter
該重寫器采用以SAMZA_為前綴的環(huán)境變量, 并將其添加到配置中,覆蓋之前存在的值。鍵較低,下劃線轉(zhuǎn)換為點(diǎn)。
job.config.rewriters如果您已定義配置重寫器,則需要按照應(yīng)用順序?qū)⑺鼈兞性谶@里。此屬性的值為以逗號分隔的重寫器名稱令牌列表 。
job.systemstreampartition.grouper.factory
org.apache.samza.container.grouper.stream.GroupByPartitionFactory
用于確定如何將SystemStreamPartition輸入組合在一起以在單個(gè)StreamTask實(shí)例中處理的工廠類。工廠必須實(shí)現(xiàn)SystemStreamPartitionGrouperFactory接口。一旦設(shè)置了這個(gè)配置,就不能改變,因?yàn)檫@樣做可能會違反狀態(tài)語義,并導(dǎo)致數(shù)據(jù)丟失。
org.apache.samza.container.grouper.stream.GroupByPartitionFactory
組根據(jù)分組號輸入流分區(qū)。此分組導(dǎo)致單個(gè)StreamTask處理具有分區(qū)0的所有輸入流之間的單個(gè)分區(qū)(例如分區(qū)0)的所有消息。因此,默認(rèn)情況下,您為具有相同分區(qū)號的所有輸入分區(qū)獲得一個(gè)StreamTask。使用此策略,如果兩個(gè)輸入流具有分區(qū)0,則來自兩個(gè)分區(qū)的消息將被路由到單個(gè)StreamTask。此分區(qū)策略對于加入和聚合流很有用。
org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
將每個(gè)SystemStreamPartition分配給其自己唯一的StreamTask。GroupBySystemStreamPartitionFactory在您希望增加并行度(更多容器)的情況下很有用,并且不關(guān)心共享分區(qū)或聯(lián)接分區(qū),因?yàn)樗试S在Samza容器之間分配更多數(shù)量的StreamTasks。
job.systemstreampartition.matcher.class
如果要啟用靜態(tài)分區(qū)分配,則這是必需的配置。此屬性的值是實(shí)現(xiàn)該接口的完全限定的 Java 類名稱 org.apache.samza.system.SystemStreamPartitionMatcherSamza 配有兩個(gè)匹配課:
org.apache.samza.system.RangeSystemStreamPartitionMatcher
此類使用逗號分隔的范圍列表來確定哪個(gè)分區(qū)匹配,并因此靜態(tài)分配給作業(yè)。例如“2,3,1-2”,對所有指定的系統(tǒng)和流(Kafka 的情況下的主題)靜態(tài)分配1,2和3。對于配置驗(yàn)證,逗號分隔列表中的每個(gè)元素都很符合以下正則表達(dá)式之一:
  • "(\\d+)" 
  • "(\\d+-\\d+)"
JobConfig.SSP_MATCHER_CLASS_RANGE常數(shù)有這個(gè)類的規(guī)范名稱。
org.apache.samza.system.RegexSystemStreamPartitionMatcher
此類使用標(biāo)準(zhǔn)的Java支持的正則表達(dá)式來確定哪個(gè)分區(qū)匹配,從而靜態(tài)分配給作業(yè)。例如“[1-2]”,靜態(tài)地為所有指定的系統(tǒng)和流(Kafka的情況下的主題)為作業(yè)分配1和2
JobConfig.SSP_MATCHER_CLASS_REGEX常數(shù)有這個(gè)類的規(guī)范名稱。
job.systemstreampartition.matcher.config.range
如果job.systemstreampartition.matcher.class指定,并且該屬性的值是 org.apache.samza.system.RangeSystemStreamPartitionMatcher,則此屬性是 必需的配置。指定一個(gè)逗號分隔的范圍列表,以確定哪個(gè)分區(qū)匹配,從而靜態(tài)分配給作業(yè)。例如“2,3,11-20”,對于所有指定的系統(tǒng)和流(在Kafka的情況下的主題),為該作業(yè)靜態(tài)分配2,3和11到20。像“19”這樣的 singel 配置值也是有效的。這個(gè)靜態(tài)分配分區(qū)19。對于 config 驗(yàn)證,逗號分隔列表中的每個(gè)元素都很符合以下正則表達(dá)式之一:
  • "(\\d+)" 
  • "(\\d+-\\d+)"
job.systemstreampartition.matcher.config.regex
如果job.systemstreampartition.matcher.class指定,并且該屬性的值是 org.apache.samza.system.RegexSystemStreamPartitionMatcher,則此屬性是 必需的配置。該值應(yīng)該是一個(gè)有效的 Java 支持的正則表達(dá)式。例如 “[1-2]”,將所有指定的系統(tǒng)和流(Kakfa 的情況下的主題)的分區(qū)1和2靜態(tài)分配給作業(yè)。
job.systemstreampartition.matcher.config.job.factory.regex
此配置可用于指定 Java 支持的正則表達(dá)式,以匹配StreamJobFactory 應(yīng)啟用靜態(tài)分區(qū)分配的正則表達(dá)式。此配置使分區(qū)分配功能也可用于自定義StreamJobFactory。

此配置默認(rèn)為以下值: "org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)"當(dāng)job.factory.class設(shè)置為 org.apache.samza.job.local.ProcessJobFactory時(shí),啟用靜態(tài)分區(qū)分配org.apache.samza.job.local.ThreadJobFactory.

job.checkpoint.validation.enabled
此設(shè)置控制作業(yè)是否應(yīng)該失?。╰rue)或只是警告(false),以防止檢查點(diǎn)分區(qū)號驗(yàn)證失敗。
注意:此配置需要謹(jǐn)慎使用。在檢查點(diǎn)自動創(chuàng)建錯(cuò)誤數(shù)量的分區(qū)之后,它應(yīng)該僅用作解決方法。
job.security.manager.factory這是用于創(chuàng)建適當(dāng)?shù)?nbsp;SecurityManager 的 factory 類,用于在安全環(huán)境中運(yùn)行時(shí)處理 Samza 容器的安全性,例如使用Kerberos 進(jìn)行的紗線。默認(rèn)情況下:
org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
支持Samza容器在Kerberos啟用的紗線群集中正常運(yùn)行。Samza容器一旦啟動,就會創(chuàng)建一個(gè)SamzaContainerSecurityManager。SamzaContainerSecurityManager在其單獨(dú)的線程上運(yùn)行,并以由 yarn.token.renewal.interval.seconds 指定的間隔更新用戶的委托。有關(guān)詳細(xì)信息,請參閱YARN安全。
job.container.count1要求運(yùn)行您的工作的YARN容器的數(shù)量。這是控制作業(yè)規(guī)模(分配計(jì)算資源)的主要參數(shù):為了增加處理的并行性,您需要增加容器數(shù)量。最小值是一個(gè)容器,容器的最大數(shù)量是任務(wù)實(shí)例的數(shù)量(通常是輸入流分區(qū) 數(shù)量)。任務(wù)實(shí)例均勻分布在您指定的容器數(shù)量上。
job.container.single.thread.mode如果設(shè)置為 true,samza 將回退到傳統(tǒng)的單線程事件循環(huán)。默認(rèn)值為 false,這將啟用多線程執(zhí)行。
job.container.thread.pool.size如果已配置,容器線程池將用于并行運(yùn)行每個(gè)任務(wù)的同步操作。操作包括 StreamTask.process(),WindowableTask.window()和內(nèi)部的 Task.commit()。請注意,線程池不適用于AsyncStremTask.processAsync()。大小應(yīng)該始終大于零。如果未配置,則所有任務(wù)操作都將在單個(gè)線程中運(yùn)行。
job.host-affinity.enabled此屬性指示是否啟用主機(jī)關(guān)聯(lián)。主機(jī)關(guān)聯(lián)是指 Samza 每次部署作業(yè)時(shí)在同一臺主機(jī)上請求和分配容器的能力。當(dāng)啟用主機(jī)密切關(guān)系時(shí),Samza 做出了“盡力而為”來遵守主機(jī)關(guān)聯(lián)約束。屬性 cluster-manager.container.request.timeout.ms 確定在對主機(jī)相關(guān)性約束進(jìn)行取消優(yōu)先級排序并將容器分配給任何可用資源之前等待多長時(shí)間。 請注意:啟用連續(xù)調(diào)度后,該功能將經(jīng)過 yarn 中的 FairScheduler 測試。
job.changelog.system此屬性指定一個(gè)用于 changelog 的默認(rèn)系統(tǒng),它將與 stores.store-name.changelog config中指定的流一起使用 。您可以通過在 stores.store-name.changelog 中指定系統(tǒng)和流來覆蓋此系統(tǒng) 
job.coordinator.factory用于工作協(xié)調(diào)的類。目前可用的值為:
org.apache.samza.standalone.PassthroughJobCoordinatorFactory
固定分區(qū)映射。沒有 Zoookeeper。
org.apache.samza.zk.ZkJobCoordinatorFactory
基于 Zookeeper 的協(xié)調(diào)。
僅適用于非集群管理的應(yīng)用程序。
基于Zookeeper的作業(yè)配置
job.coordinator.zk.connect對于使用基于 Zookeeper 協(xié)調(diào)的應(yīng)用程序而言是必需的。Zookeeper 坐標(biāo)(在“host:port [/ znode]”格式中)用于協(xié)調(diào)。
job.coordinator.zk.session.timeout.ms30000所有 ZK 連接的 Zookeeper 會話超時(shí)以毫秒為單位。會話超時(shí)控制在不能與 ZK 服務(wù)器通話之前,zk 客戶端在拋出異常之前等待多久。
job.coordinator.zk.connection.timeout.ms60000Zookeeper 連接超時(shí)(毫秒)。Zk 連接超時(shí)控制客戶端在放棄之前嘗試連接到 ZK 服務(wù)器的時(shí)間。
job.coordinator.zk.consensus.timeout.ms40000每個(gè)處理器將等待所有處理器在回滾之前報(bào)告接受新作業(yè)模型的時(shí)間。
job.debounce.time.ms2000在注冊處理器更改之前,Leader 處理器需要等待多久才能重新計(jì)算 JobModel。
任務(wù)配置
task.class必需:從輸入流處理傳入消息的 Java 類的完全限定名稱。這個(gè)類必須實(shí)現(xiàn) StreamTask  AsyncStreamTask,并且可任選地實(shí)現(xiàn) InitableTask ClosableTask  
WindowableTask。該類將被實(shí)例化幾次,每次輸入流分區(qū)一次 。
task.inputs必需:以此作業(yè)使用的流的逗號分隔列表。每個(gè)流都以system-name 格式給出 stream-name 。舉例來說,如果你有一個(gè)叫做輸入系統(tǒng)my-kafka,并要消耗2個(gè) Kafka 主題叫做PageViewEventUserActivityEvent,那么你就設(shè)置 task.inputs=my-kafka.PageViewEvent, my-kafka.UserActivityEvent
task.window.ms-1如果 task.class 實(shí)現(xiàn) WindowableTask,它可以定期接收一個(gè) 加窗回調(diào)此屬性指定window()調(diào)用之間的時(shí)間,以毫秒為單位。如果數(shù)字為負(fù)數(shù)(默認(rèn)值),則不會調(diào)用window()。請注意,Samza是 單線程的,所以window()調(diào)用永遠(yuǎn)不會與處理消息同時(shí)發(fā)生。如果在window()調(diào)用到期時(shí)正在處理消息,則在完成當(dāng)前消息的處理之后發(fā)生window()調(diào)用。
task.checkpoint.factory要啟用 檢查點(diǎn),必須將此屬性設(shè)置為實(shí)現(xiàn)CheckpointManagerFactory 的 Java 類的完全限定名稱 這不是必需的,但建議大多數(shù)工作。如果您沒有配置檢查點(diǎn),并且作業(yè)或容器重新啟動,它不記得它已經(jīng)處理了哪些消息。沒有檢查點(diǎn),消費(fèi)者行為由 ... samza.offset.default 決定設(shè)置,默認(rèn)情況下將跳過在容器重新啟動時(shí)發(fā)布的任何消息。檢查點(diǎn)允許作業(yè)啟動它以前停止的位置。默認(rèn)情況下,Samza 有兩個(gè)檢查點(diǎn)管理員:
org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
將檢查點(diǎn)寫入本地文件系統(tǒng)上的文件。您可以使用 task.checkpoint.path 屬性配置文件路徑。如果您的作業(yè)始終運(yùn)行在同一臺機(jī)器上,這是一個(gè)簡單的選項(xiàng)。在多機(jī)集群上,這將需要安裝網(wǎng)絡(luò)文件系統(tǒng)。
org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
將檢查點(diǎn)寫入 Kafka 集群上的專用主題。如果您已經(jīng)使用Kafka輸入或輸出流,則這是推薦的選項(xiàng)。使用 task.checkpoint.system 屬性來配置用于檢查點(diǎn)的Kafka 集群。
task.commit.ms60000如果配置了 task.checkpoint.factory,則此屬性確定檢查點(diǎn)的寫入頻率。值是檢查點(diǎn)之間的時(shí)間,以毫秒為單位。檢查點(diǎn)的頻率會影響故障恢復(fù):如果容器意外失?。ɡ纾捎诒罎⒒驒C(jī)器故障)而重新啟動,則會在最后一個(gè)檢查點(diǎn)恢復(fù)處理。從失敗的容器上的最后一個(gè)檢查點(diǎn)處理的任何消息將被再次處理。檢查點(diǎn)更頻繁地減少可能被處理兩次的消息數(shù)量,而且還使用更多的資源。
task.command.class
org.apache.samza.job.ShellCommandBuilder
確定 容器 的命令行和環(huán)境變量的Java類的完全限定名稱。它必須是 CommandBuilder 的子類 。默認(rèn)為task.command.class=org.apache.samza.job.ShellCommandBuilder。
task.opts任何 JVM 選項(xiàng)在執(zhí)行Samza容器時(shí)都包含在命令行中。例如,這可以用于設(shè)置JVM堆大小,調(diào)整垃圾回收器或啟用 遠(yuǎn)程調(diào)試。運(yùn)行時(shí)無法使用ThreadJobFactory。任何您放入的東西 task.opts都將直接轉(zhuǎn)發(fā)到命令行,作為JVM調(diào)用的一部分。
例: task.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC
task.java.homeSamza 容器的 JAVA_HOME 路徑。通過設(shè)置此屬性,可以使用與集群的 Java 版本不同的 java 版本。記住設(shè)置yarn.am.java.home好。
例: task.java.home=/usr/java/jdk1.8.0_05
task.executebin/ run-container.sh啟動 Samza 容器的命令。腳本必須包含在 作業(yè)包中。通常不需要定制這個(gè)。
task.chooser.class
org.apache.samza.system.chooser.RoundRobinChooserFactory
該屬性可以選擇設(shè)置為覆蓋默認(rèn) 消息選擇器,該消息選擇器確定處理來自多個(gè)輸入流的消息的順序。此屬性的值是實(shí)現(xiàn)MessageChooserFactory 的 Java 類的完全限定名稱 。
task.drop.deserialization.errors這個(gè)屬性是定義系統(tǒng)如何??處理反序列化失敗的情況。如果設(shè)置為 true,系統(tǒng)將跳過錯(cuò)誤消息并繼續(xù)運(yùn)行。如果設(shè)置為false,則系統(tǒng)將拋出異常并使容器失敗。默認(rèn)值為 false。
task.drop.serialization.errors此屬性用于定義系統(tǒng)如何??處理序列化失敗情況。如果設(shè)置為true,系統(tǒng)將丟棄錯(cuò)誤消息并繼續(xù)運(yùn)行。如果設(shè)置為 false,則系統(tǒng)將拋出異常并使容器失敗。默認(rèn)值為 false。
task.log4j.system指定 StreamAppender 的系統(tǒng)名稱。如果配置中未指定此屬性,則 Samza 將拋出異常。(參見 Stream Log4j Appender
例: task.log4j.system=kafka
task.log4j.location.info.enabled定義是否在 Log4j StreamAppender 消息中包含 log4j 的LocationInfo 數(shù)據(jù)。LocationInfo 包括寫入日志消息的文件,類和行等信息。此設(shè)置僅在使用Log4j流附加程序時(shí)有效。(參見Stream Log4j Appender
例: task.log4j.location.info.enabled=true
task.poll.interval.msSamza 的容器在兩個(gè)條件下輪詢更多的消息。當(dāng)任何輸入SystemStreamPartition 不存在任何剩余的緩沖消息時(shí),第一個(gè)條件出現(xiàn)。第二個(gè)條件出現(xiàn)當(dāng)一些輸入SystemStreamPartitions 有空緩沖區(qū),但有些不具有。在后一種情況下,定義輪詢間隔以確定刷新空的SystemStreamPartition 緩沖區(qū)的頻率。默認(rèn)情況下,此間隔為 50ms,這意味著任何空的 SystemStreamPartition 緩沖區(qū)至少每 50ms 刷新一次。這里的值越大意味著空的SystemStreamPartition 將不會更頻繁地刷新,這意味著更多的延遲被引入,但是將使用更少的 CPU 和網(wǎng)絡(luò)。
task.ignored.exceptions此屬性指定在任務(wù)processwindow 方法中拋出哪些異常應(yīng)忽略。被忽略的例外應(yīng)該是一個(gè)逗號分隔的例外的完全限定類名的列表,或 *忽略所有異常。
task.shutdown.ms5000此屬性控制 Samza 容器等待有序關(guān)閉任務(wù)實(shí)例的時(shí)間。
task.name.grouper.factory
org.apache.samza.container.grouper.task.GroupByContainerCountFactory
確定將生成 TaskNameGrouper 的工廠類的 Java 類的完全限定名稱。如果屬性不存在,則默認(rèn)配置值為task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory
用戶可以指定 TaskNameGrouperFactory 的自定義實(shí)現(xiàn),其中實(shí)現(xiàn)用于對任務(wù)進(jìn)行分組的自定義邏輯。

注意:對于非集群應(yīng)用程序(使用協(xié)調(diào)服務(wù)的應(yīng)用程序),必須使用 org.apache.samza.container.grouper.task.GroupByContainerIdsFactory

task.broadcast.inputs此屬性指定所有任務(wù)應(yīng)消耗的分區(qū)。你把這里的systemStreamPartitions 發(fā)送到所有的任務(wù)。
格式:system-name.stream-namepartitionId 或 system-name.stream-name#[ startingPartitionId - endingPartitionId ]
例: task.broadcast.inputs=mySystem.broadcastStream#[0-2], mySystem.broadcastStream#0
task.max.concurrency1每個(gè)任務(wù)一次處理的最大數(shù)量的未完成的消息,它適用于StreamTask 和 AsyncStreamTask。值可以是:
1
每個(gè)任務(wù)一次處理一個(gè)消息。下一條消息將等到當(dāng)前消息進(jìn)程完成。這確保了嚴(yán)格的按順序處理。
>1
允許每個(gè)任務(wù)一次處理多個(gè)未完成的消息。完成可能是無序的。此選項(xiàng)增加任務(wù)中的并行性,但可能導(dǎo)致無序處理。
task.callback.timeout.ms此屬性僅適用于 AsyncStreamTask。它定義從processAsync()到回調(diào)的最大時(shí)間間隔被觸發(fā)。當(dāng)超時(shí)發(fā)生時(shí),它將拋出一個(gè) TaskCallbackTimeoutException 并關(guān)閉該容器。默認(rèn)是沒有超時(shí)。
task.consumer.batch.size1如果設(shè)置為正整數(shù),則任務(wù)將嘗試使用 每個(gè)輸入流中給定數(shù)量的消息的批次,而不是從每個(gè)單獨(dú)消息的所有輸入流中消耗循環(huán)。在某些情況下,設(shè)置此屬性可以提高性能。
系統(tǒng)
systems.system-name.samza.factory
必需:提供系統(tǒng)的 Java 類的全限定名稱 。系統(tǒng)可以提供您可以在 Samza 作業(yè)中使用的輸入流,也可以輸出可以寫入的輸出流,或同時(shí)提供兩者。對系統(tǒng)的要求非常靈活 - 它可以連接到消息代理,或讀取和寫入文件,或使用數(shù)據(jù)庫或其他任何東西。該類必須實(shí)現(xiàn) SystemFactory。Samza 具有以下實(shí)現(xiàn):
org.apache.samza.system.kafka.KafkaSystemFactory
連接到一組 Kafka 代理,允許 Kafka 主題作為 Samza中的流使用,允許將消息發(fā)布到 Kafka 主題,并允許將Kafka 用于檢查點(diǎn)
org.apache.samza.system.filereader.FileReaderSystemFactory
從本地文件系統(tǒng)上的文件讀取數(shù)據(jù)(流名稱是要讀取的文件的路徑)。該文件以 ASCII 格式讀取,并被視為以newline(\n)字符分隔的消息流。任務(wù)可以將文件的每一行作為java.lang.String對象使用。該系統(tǒng)不提供輸出流。
systems.system-name.default.stream*
與系統(tǒng)關(guān)聯(lián)的任何流的一組默認(rèn)屬性。例如,如果配置了“systems.kafka-system.default.stream.replication.factor”= 2,則在kafka 系統(tǒng)上創(chuàng)建的每個(gè) Kafka 流將具有2的復(fù)制因子,除非在流中顯式覆蓋該屬性范圍使用流屬性。
systems.system-name.
default.stream.samza.key.serde
該 SERDE 將用于反序列化 上輸入流的消息,和序列化上輸出流消息。此屬性定義系統(tǒng)中所有流的serde。請參閱 stream-scoped 屬性來定義單個(gè)流的 serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用 serializers.registry 注冊的 serde-namesystems.system-name.*。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。
systems.system-name.
default.stream.samza.msg.serde
將用于反序列化輸入流上的消息 的 serde,并串行化輸出流上消息此屬性定義系統(tǒng)中所有流的serde。請參閱 stream-scoped 屬性來定義單個(gè)流的 serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用serializers.registry 注冊的 serde-name *。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。
systems.system-name.
default.stream.samza.offset.default
 即將到來如果容器在沒有檢查點(diǎn)的情況下啟動,則該屬性確定輸入流中我們應(yīng)該開始使用的位置。該值必須為 OffsetType,為以下之一:
upcoming
開始處理作業(yè)啟動后發(fā)布的消息。作業(yè)未運(yùn)行時(shí)發(fā)布的任何消息都不會被處理。
oldest
開始處理系統(tǒng)中最早的可用消息,并重新處理整個(gè)可用的消息歷史記錄。
此屬性適用于系統(tǒng)中的所有流。要將其設(shè)置為單個(gè)流,請參閱  streams.stream-id
samza.offset.default 如果兩者都被定義,流級定義優(yōu)先。
systems.system-name.samza.key.serde
這不支持 systems.system-name.default.stream.samza.key.serde
systems.system-name.streams.stream-name.
samza.key.serde
這不支持 streams.stream-id.samza.key.serde.
systems.system-name.samza.msg.serde
這不支持 systems.system-name.default.stream.samza.msg.serde
systems.system-name.streams.stream-name.
samza.msg.serde
這不支持 streams.stream-id.samza.msg.serde
systems.system-name.samza.offset.default
即將到來這不支持 systems.system-name.default.stream.samza.offset.default.
systems.system-name.streams.stream-name
samza.offset.default
這不支持 streams.stream-id.samza.offset.default
systems.system-name.streams.stream-name
samza.reset.offset
這不支持 streams.stream-id.samza.reset.offset.
systems.system-name.streams.stream-name
samza.priority
-1這不支持 streams.stream-id.samza.priority.
systems.system-name.streams.stream-name
samza.bootstrap
這不支持 streams.stream-id.samza.bootstrap.
streams.stream-id.samza.system
將訪問此流的系統(tǒng)的 system-name。此屬性將流綁定到由屬性系統(tǒng)定義的系統(tǒng)之一。system-name.samza.factory。
如果未指定此屬性,它將從 job.default.system 繼承。
streams.stream-id.samza.physical.name
系統(tǒng)上將訪問此流的物理名稱。這是與 Samza 用來識別流的邏輯名稱的 stream-id 相反的。物理名稱可以是 Kafka 主題名稱,HDFS 文件 URN 或任何其他系統(tǒng)特定的標(biāo)識符。
 streams.stream-id.samza.key.serde
該 SERDE 將用于反序列化 上輸入流的消息,和序列化上輸出流消息。此屬性定義單個(gè)流的serde。請參閱 system-scoped 屬性以定義系統(tǒng)中所有流的 serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用 serializers.registry 注冊的 serde-name.*如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。
streams.stream-id.samza.msg.serde
將用于反 序列化輸入流上的消息的 serde,并串行化輸出流上消息。此屬性定義單個(gè)流的serde。請參閱 system-scoped 屬性以定義系統(tǒng)中所有流的serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用serializers.registry 注冊的 serde-name .*。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。
streams.stream-id.samza.offset.default
即將到來如果容器在沒有檢查點(diǎn)的情況下啟動,則該屬性確定輸入流中我們應(yīng)該開始使用的位置。該值必須為 OffsetType,為以下之一:
upcoming
開始處理作業(yè)啟動后發(fā)布的消息。作業(yè)未運(yùn)行時(shí)發(fā)布的任何消息都不會被處理。
oldest
開始處理系統(tǒng)中最早的可用消息,并重新處理整個(gè)可用的消息歷史記錄。
此屬性適用于單個(gè)流。要為系統(tǒng)中的所有流設(shè)置它,請參閱 systems.system-name ;samza.offset.default 如果兩者都被定義,流級定義優(yōu)先。
streams.stream-id.samza.reset.offset
如果設(shè)置為true,當(dāng)Samza容器啟動時(shí),它將忽略該特定輸入流的任何 檢查點(diǎn)偏移量因此,其行為由samza.offset.default設(shè)置決定。請注意,每次啟動容器時(shí),復(fù)位都將生效,這可能是每次重新啟動作業(yè)時(shí),或者如果容器發(fā)生故障并由框架重新啟動,則會更頻繁。
streams.stream-id.samza.priority
-1如果一個(gè)或多個(gè)流具有優(yōu)先級設(shè)置(任何正整數(shù)),則將以比其他流更高的優(yōu)先級處理它們。您可以將多個(gè)流設(shè)置為相同的優(yōu)先級,或通過為較高優(yōu)先級的流分配更多的數(shù)字來定義多個(gè)優(yōu)先級。如果較高優(yōu)先級的流具有可用的消息,則它們將始終被處理; 來自較低優(yōu)先級流的消息僅在高優(yōu)先級輸入沒有新消息時(shí)被處理。
streams.stream-id.samza.bootstrap
如果設(shè)置為true,該流將被作為streams.自舉流處理。這意味著每次 Samza 容器啟動時(shí),在處理任何其他流的消息之前,該流將被完全消耗。
streams.stream-id.*流的任何屬性。這些通常是系統(tǒng)特定的,可以由系統(tǒng)用于流創(chuàng)建或驗(yàn)證。請注意,其他屬性前綴為 samza。 將它們區(qū)分為不是系統(tǒng)特定的 Samza 屬性。
串行器/解串器(Serdes)
serializers.registry.serde-name .class
使用此屬性注冊序列化器/解串器,它定義了將應(yīng)用對象編碼為字節(jié)數(shù)組(用于流中的消息以及持久存儲中的數(shù)據(jù))的方式。你可以給你所需要的任何 serde-serde 名字,并在系統(tǒng)的屬性中引用該名稱 systems.*.samza.key.serde, 
 streams.*.samza.key.serde,streams.*.samza.msg.serde, stores.*.key.serde  systems.*.samza.msg.serde,streams.*.samza.key.serde,
streams.*.samza.key.serde, streams.*.samza.msg.serde,stores.*.key.serde  stores.*.msg.serde。此屬性的值是實(shí)現(xiàn) SerdeFactory 的 Java 類的完全限定名稱 。Samza 有幾個(gè) serdes:
org.apache.samza.serializers.ByteSerdeFactory
通過未解碼字節(jié)數(shù)組的無操作序列。
org.apache.samza.serializers.IntegerSerdeFactory
java.lang.Integer對象編碼為二進(jìn)制(4字節(jié)固定長度大端編碼)。
org.apache.samza.serializers.StringSerdeFactory
java.lang.String對象編碼為 UTF-8。
org.apache.samza.serializers.JsonSerdeFactory
編碼的嵌套結(jié)構(gòu)java.util.Mapjava.util.List等等作為 JSON。
org.apache.samza.serializers.LongSerdeFactory
編碼java.lang.Long為二進(jìn)制(8字節(jié)固定長度大碼編碼)。
org.apache.samza.serializers.DoubleSerdeFactory
編碼java.lang.Double為 binray(8字節(jié)雙精度浮點(diǎn))。
org.apache.samza.serializers.MetricsSnapshotSerdeFactory
org.apache.samza.metrics.reporter.MetricsSnapshot對象(用于報(bào)告指標(biāo)編碼為 JSON。
org.apache.samza.serializers.KafkaSerdeFactory
適配器,可以將現(xiàn)有的kafka.serializer.Encoder kafka.serializer.Decoder實(shí)現(xiàn)用作Samza serdes。設(shè)置serializers.registry。serde-name .encoder和serializers.registry。serde-name .decoder到適當(dāng)?shù)念惷?/font>
將文件系統(tǒng)用于檢查點(diǎn)
(本節(jié)適用于您已設(shè)置 task.checkpoint.factory = org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
task.checkpoint.path如果將文件系統(tǒng)用于檢查點(diǎn),則為必需。將其設(shè)置為本地文件系統(tǒng)上應(yīng)存儲檢查點(diǎn)文件的路徑。
使用 Elasticsearch 輸出流
(適用于您已設(shè)置 systems.*.samza.factory= org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
systems.system-name.client.factory
必需:彈性搜索客戶端工廠用于連接到彈性搜索集群。Samza具有以下實(shí)現(xiàn):
org.apache.samza.system.elasticsearch.client.TransportClientFactory
創(chuàng)建一個(gè)遠(yuǎn)程連接到集群的TransportClient,而不加入它。這需要設(shè)置傳輸主機(jī)和端口屬性。
org.apache.samza.system.elasticsearch.client.NodeClientFactory
創(chuàng)建通過加入它連接到集群的Node客戶端。默認(rèn)情況下,這將使用zen發(fā)現(xiàn)來查找集群,但可以配置其他方法。
systems.system-name.index.request.factory
org.apache.samza.system.elasticsearch.indexrequest.
DefaultIndexRequestFactory
索引請求工廠將Samza OutgoingMessageEnvelope轉(zhuǎn)換為IndexRequest以發(fā)送到彈性搜索。默認(rèn)的IndexRequestFactory的行為如下:
Stream name
流名稱的格式為{index-name} / {type-name},其映射到彈性搜索索引和類型。
Message id
如果消息有一個(gè)密鑰,則將其設(shè)置為文檔ID,否則Elasticsearch將為每個(gè)文檔生成一個(gè)密鑰。
Partition id
如果分區(qū)密鑰被設(shè)置,那么它被用作彈性搜索路由密鑰。
Message
該消息必須是直接傳遞給Elasticsearch的一個(gè)byte [],或者傳遞給Elasticsearch客戶端的Map,該客戶端將其連接成JSON String。目前不支持Samza serdes。
systems.system-name.client.transport.host
需要TransportClientFactory

TransportClientFactory 連接到的主機(jī)名。

systems.system-name.client.transport.port
需要TransportClientFactory

TransportClientFactory 連接到的端口。

systems.system-name.client.elasticsearch.*
任何 Elasticsearch 客戶端設(shè)置都可以在這里使用。它們都將傳遞給傳輸和節(jié)點(diǎn)客戶端。您想要提供的一些常見設(shè)置是。
systems.system-name.client.elasticsearch.cluster.name
客戶端連接到的彈性搜索集群的名稱。
systems.system-name.client.elasticsearch.client.transport.sniff
如果設(shè)置為,true則傳輸客戶端將發(fā)現(xiàn)并保持所有集群節(jié)點(diǎn)的最新狀態(tài)。這用于重試上的負(fù)載平衡和故障切換。
systems.system-name.bulk.flush.max.actions
1000沖洗前要緩沖的最大消息數(shù)。
systems.system-name.bulk.flush.max.size.mb
沖洗前緩沖的消息的最大聚合大小。
systems.system-name.bulk.flush.interval.ms
決不緩沖消息應(yīng)該被刷新多久。
使用 Kafka 輸入流,輸出流和檢查點(diǎn)
(適用于您已設(shè)置 systems.*.samza.factory = org.apache.samza.system.kafka.KafkaSystemFactory
systems.system-name.consumer.zookeeper.connect可以找到有關(guān) Kafka 群集信息的一個(gè)或多個(gè) Zookeeper 節(jié)點(diǎn)的主機(jī)名和端口。這是以逗號分隔的hostname:port列表給出的 ,例如zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181。如果集群信息位于 Zookeeper 命名空間的某個(gè)子路徑,則需要將路徑包含在主機(jī)名列表的末尾,例如:zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181/clusters/my-kafka
systems.system-name.consumer.auto.offset.reset最大此設(shè)置確定使用者如何嘗試讀取超出當(dāng)前有效范圍的偏移量會發(fā)生什么。如果主題不存在,或者如果檢查點(diǎn)比代理所保留的最大消息歷史更早,則可能會發(fā)生這種情況。這個(gè)屬性不要與 systems.*.samza.offset.default 混淆,它確定如果沒有檢查點(diǎn)會發(fā)生什么。以下是以下的有效值auto.offset.reset
smallest
開始消耗在代理上可用的最?。ㄗ钆f的)偏移量(盡可能多的消息歷史記錄)。
largest
開始消耗在代理上可用的最大(最新)偏移量(跳過任務(wù)未運(yùn)行時(shí)發(fā)布的任何消息)。
還要別的嗎
拋出異常,拒絕啟動工作。
systems.system-name.consumer.*
任何 Kafka 使用者配置 都可以包含在這里。例如,要更改套接字超時(shí),可以設(shè)置系統(tǒng)。systemname .consumer.socket.timeout.ms。(沒有必要配置,group.id或者client.id是由Samza自動配置,而且沒有必要設(shè)置, auto.commit.enable因?yàn)镾amza 有自己的檢查點(diǎn)機(jī)制)
systems.system-name.producer.bootstrap.servers注意 此變量之前已定義為“producer.metadata.broker.list”,該版本已被棄用。 
Kafka 代理正在運(yùn)行的網(wǎng)絡(luò)端點(diǎn)列表。例如,以逗號分隔的hostname:port列表給出 kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092不需要列出集群中的每個(gè)單個(gè)Kafka節(jié)點(diǎn):Samza使用此屬性來發(fā)現(xiàn)哪些主題和分區(qū)托管在哪個(gè)代理上。即使您只是從卡夫卡消費(fèi)而不是寫信給這個(gè)屬性,因?yàn)?Samza 使用它來發(fā)現(xiàn)關(guān)于正在消費(fèi)的流的元數(shù)據(jù)。
systems.system-name.producer.producer.*

這里可以包含 任何 Kafka 生產(chǎn)者配置例如,要更改請求超時(shí),可以設(shè)置系統(tǒng)。system-name .producer.timeout.ms。(沒有必要配置,client.id因?yàn)樗怯蒘amza自動配置的。)
systems.system-name.samza.fetch.threshold50000當(dāng)從 Kafka 消耗流時(shí),Samza 容器為傳入的消息維護(hù)一個(gè)內(nèi)存中的緩沖區(qū),以增加吞吐量(流任務(wù)可以繼續(xù)處理緩沖的消息,而從 Kafka 獲取新消息)。此參數(shù)確定了我們旨在緩沖容器所消耗的所有流分區(qū)的消息數(shù)。例如,如果一個(gè)容器消耗50個(gè)分區(qū),它將默認(rèn)緩存每個(gè)分區(qū)的1000個(gè)消息。當(dāng)緩沖消息的數(shù)量低于該閾值時(shí),Samza 從 Kafka 代理獲取更多消息來補(bǔ)充緩沖區(qū)。增加此參數(shù)可以增加作業(yè)“
systems.system-name.samza.fetch.threshold.bytes-1當(dāng)從 Kafka 消耗流時(shí),Samza 容器為傳入的消息維護(hù)一個(gè)內(nèi)存中的緩沖區(qū),以增加吞吐量(流任務(wù)可以繼續(xù)處理緩沖的消息,而從 Kafka 獲取新消息)。此參數(shù)確定了我們旨在基于字節(jié)在容器所消耗的所有流分區(qū)之間緩沖的消息的總大小。定義作為整體的緩沖預(yù)取消息使用的字節(jié)數(shù)。基于此計(jì)算單個(gè)系統(tǒng)/流/分區(qū)的字節(jié)。這將獲取整個(gè)消息,因此這個(gè)字節(jié)限制是一個(gè)軟的,并且實(shí)際使用可以是給定流的分區(qū)中的最大消息的字節(jié)數(shù)限制+大小。如果此屬性的值為大于 0,那么這將優(yōu)先于系統(tǒng)。system-name.samza.fetch.threshold。
例如, 如果 fetchThresholdBytes 設(shè)置為100000個(gè)字節(jié), 并且注冊了 50 SystemStreamPartitions, 則每個(gè)分區(qū)的閾值為 (100000/2)/50 = 1000 字節(jié)。由于這是一個(gè)軟限制, 實(shí)際使用可以是1000字節(jié) + 最大消息的大小。一旦 SystemStreamPartition 的緩沖消息字節(jié)數(shù)降至 1000 以下, 就會執(zhí)行回遷請求以獲取更多數(shù)據(jù)。增加此參數(shù)將減少當(dāng)隊(duì)列被消息耗盡和新消息排隊(duì)時(shí)之間的延遲, 但也會導(dǎo)致內(nèi)存使用量增加, 因?yàn)閷⒂懈嗟南⒈4嬖趦?nèi)存中。默認(rèn)值為-1, 表示不使用此方法。
task.checkpoint.system如果您使用Kafka檢查點(diǎn)(task.checkpoint.factory = org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory),則此屬性是必需的。您必須將其設(shè)置為 Kafka 系統(tǒng)系統(tǒng)名稱。該系統(tǒng)中的流名稱(主題名稱)將根據(jù)作業(yè)名稱和ID自動確定:( 在作業(yè)名稱中帶下劃線,ID由連字符替換)。 __samza_checkpoint_${job.name}_${job.id}
task.checkpoint.replication.factor
3如果您使用 Kafka 作為檢查點(diǎn),那么您希望將檢查點(diǎn)主題復(fù)制到其持久性的 Kafka 節(jié)點(diǎn)的數(shù)量。
task.checkpoint.segment.bytes26214400如果您使用 Kafka 作為檢查點(diǎn),則這是用于檢查點(diǎn)主題的日志段的段大小。保持這個(gè)數(shù)字很小是有用的,因?yàn)樗黾恿丝ǚ蚩ɡ占f檢查點(diǎn)的頻率。
stores.store-name .changelog.replication.factorstores.default.changelog.replication.factor該屬性定義要用于更改日志流的副本數(shù)。
stores.default.changelog.replication.factor2此屬性定義要用于更改日志流的副本的默認(rèn)數(shù)量。
stores.store-name .changelog.kafka.topic-level-property該屬性允許您為要創(chuàng)建的更改日志主題指定主題級別設(shè)置。例如,您可以將清理策略指定為“stores.mystore.changelog.cleanup.policy = delete”。有關(guān)更多主題級配置,請參閱http://kafka.apache.org/documentation.html#configuration。
使用與正則表達(dá)式匹配的所有 Kafka 主題
(本節(jié)適用于已設(shè)置  job.config.rewriter.*.class= org.apache.samza.config.RegExTopicGenerator
job.config.rewriter.rewriter-name.system將此屬性設(shè)置為要從其中消費(fèi)所有匹配主題的 Kafka 系統(tǒng)system-name。
job.config.rewriter.rewriter-name.regex一個(gè)正則表達(dá)式,指定要在 Kafka 系統(tǒng)中使用哪些主題 job.config.rewriter.*.system。除了使用 task.inputs 指定的任何主題之外,與此正則表達(dá)式匹配的任何主題都將被使用
job.config.rewriter.rewriter-name.config.*在此命名空間中指定的任何屬性將應(yīng)用于與job.config.rewriter.*.regex中的正則表達(dá)式匹配的流的配置 。例如,您可以設(shè)置job.config.rewriter.*.config.samza.msg.serde為匹配流中的郵件配置解串器,這相當(dāng)于為 正則表達(dá)式匹配的每個(gè)主題設(shè)置 sytems.*.streams.*.samza.msg.serde。
存儲和國家管理
stores.store-name.factory這個(gè)屬性定義了一個(gè)商店,Samza的有效狀態(tài)流處理機(jī)制 。你可以給一家商店的任何商店的名字,除了默認(rèn)(在商店名稱 默認(rèn)保留定義默認(rèn)存儲參數(shù)),并使用該名稱在您的流任務(wù)中獲得的存儲的引用(在你的任務(wù)的 init()方法調(diào)用 TaskContext.getStore())。此屬性的值是實(shí)現(xiàn)的Java類的完全限定名稱 StorageEngineFactory。Samza目前隨附一個(gè)存儲引擎實(shí)現(xiàn):
org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
具有鍵值接口的磁盤存儲引擎,采用 RocksDB 實(shí)現(xiàn) 。它支持快速隨機(jī)訪問讀取和寫入,以及鍵上的范圍查詢。RocksDB可以配置各種附加的調(diào)諧參數(shù)
stores.store-name.key.serde如果存儲引擎預(yù)期存儲中的密鑰是簡單的字節(jié)數(shù)組,則該 serde 允許流任務(wù)使用另一對象類型作為密鑰訪問存儲。此屬性的值必須是使用 serializers.registry 注冊 serde-name.*如果沒有設(shè)置該屬性,鍵傳遞未修改存儲引擎(和 更新日志流,如果合適的話)。
stores.store-name.msg.serde如果存儲引擎預(yù)期商店中的值是簡單的字節(jié)數(shù)組,則該 serde允許流任務(wù)使用另一個(gè)對象類型作為值訪問存儲。此屬性的值必須是使用 serializers.registry 注冊 serde-name.*。如果未設(shè)置此屬性,則將值修改為存儲引擎(如果適用),則更改日志流。
stores.store-name.changelogSamza 商店是集裝箱本地的。如果容器發(fā)生故障,則商店的內(nèi)容將丟失。為了防止數(shù)據(jù)丟失,您需要將此屬性設(shè)置為配置更改日志流:Samza 然后確保對存儲的寫入將復(fù)制到此流,并在故障后從該流恢復(fù)存儲。該屬性的值以 system-name.stream-name的形式給出。“系統(tǒng)名稱”部分是可選的。如果省略,則必須在 job.changelog.system 中指定系統(tǒng)配置。任何輸出流都可以用作更改日志,但是您必須確保只有一個(gè)作業(yè)可以寫入給定的更改日志流(作業(yè)的每個(gè)實(shí)例,每個(gè)存儲需要自己的更改日志流)。
使用 RocksDB 進(jìn)行鍵值存儲
(本節(jié)適用于您設(shè)置了 stores.*.factory= org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.store-name.write.batch.size500為了獲得更好的寫入性能,存儲引擎將緩沖區(qū)寫入并將其應(yīng)用到底層存儲中。如果快速連續(xù)寫入相同的密鑰多次,則該緩沖區(qū)也會對相同的密鑰進(jìn)行重復(fù)數(shù)據(jù)刪除。此屬性設(shè)置為每個(gè)任務(wù)實(shí)例中應(yīng)保存在此內(nèi)存緩沖區(qū)中的鍵/值對數(shù)。數(shù)量不能大于 stores.*.object.cache.size
stores.store-name.object.cache.size1000Samza在經(jīng)常訪問的對象之前為RocksDB提供了額外的緩存。緩存序列化對象的RocksDB塊緩存(stores.*.container.cache.size.bytes相反,此緩存包含反序列化對象(避免緩存命中中的反序列化開銷)。此屬性確定每個(gè)任務(wù)實(shí)例在Samza緩存中保留的對象數(shù)。同樣的緩存也用于寫入緩沖(請參閱 stores.*.write.batch.size)。值為 0 將禁用所有緩存和批處理。
stores.store-name.container.cache.size.bytes104857600RocksDB的塊緩存大?。ㄒ宰止?jié)為單位),每個(gè)容器。如果一個(gè)容器內(nèi)有若干個(gè)任務(wù)實(shí)例,則每個(gè)容器中都有一個(gè)比例份額。請注意,這是一個(gè)非堆內(nèi)存分配,因此容器的總內(nèi)存使用量是最大的JVM堆大小加上此緩存的大小。
stores.store-name.container.write.buffer.size.bytes33554432在每個(gè)容器寫入磁盤之前,RocksDB用于緩沖寫入的內(nèi)存量(以字節(jié)為單位)。如果一個(gè)容器內(nèi)有若干個(gè)任務(wù)實(shí)例,則每個(gè)容器中都有一個(gè)比例份額。此設(shè)置還可確定RocksDB的段文件的大小。
stores.store-name.rocksdb.compression瞬間此屬性控制 RocksDB 是否應(yīng)壓縮磁盤上的數(shù)據(jù)和塊緩存中的數(shù)據(jù)。以下值有效:
snappy
使用 Snappy 編解碼器壓縮數(shù)據(jù)。
bzip2
使用 bzip2 編解碼器壓縮數(shù)據(jù)。
zlib
使用 zlib 編解碼器壓縮數(shù)據(jù)
lz4
使用 lz4 編解碼器壓縮數(shù)據(jù)。
lz4hc
使用 lz4hc(高壓縮)編解碼器壓縮數(shù)據(jù)
none
不壓縮數(shù)據(jù)。
stores.store-name.rocksdb.block.size.bytes4096如果啟用了壓縮,RocksDB將這個(gè)許多未壓縮字節(jié)分組到一個(gè)壓縮塊中。您可能不需要更改此屬性。
stores.store-name.rocksdb.ttl.ms商店的生存時(shí)間。請注意,它不是嚴(yán)格的TTL限制(僅在壓實(shí)后才被刪除)。請謹(jǐn)慎打開帶有和不帶TTL的數(shù)據(jù)庫,因?yàn)樗赡軙p壞數(shù)據(jù)庫。使用前請務(wù)必閱讀約束。
stores.store-name.rocksdb.compaction.style普遍該屬性控制RocksDB在壓縮其級別時(shí)將采用的壓縮樣式。以下值有效:
universal
使用 universal 壓實(shí)。
fifo
使用 FIFO 壓縮。
level
使用 RocksDB 的標(biāo)準(zhǔn)壓實(shí)。
stores.store-name.rocksdb.num.write.buffers3配置 RocksDB 存儲使用的寫緩沖區(qū)數(shù)。這允許 RocksDB 繼續(xù)對其他緩沖區(qū)進(jìn)行寫入,即使給定的寫入緩沖區(qū)被刷新到磁盤。
stores.store-name.rocksdb.max.log.file.size.bytes67108864RocksDB LOG 文件在旋轉(zhuǎn)之前的最大大小(以字節(jié)為單位)。
stores.store-name.rocksdb.keep.log.file.num2RocksDB LOG 文件的數(shù)量(包括旋轉(zhuǎn) LOG.old.*文件)要保留。
與集群管理一起運(yùn)行 Samza
cluster-manager.container.memory.mb1024每個(gè)容器的工作需要從集群管理器請求多少內(nèi)存(兆字節(jié))。 cluster-manager.container.cpu.cores 一起,此屬性確定集群管理器在一臺計(jì)算機(jī)上運(yùn)行的容器數(shù)量。如果容器超過這個(gè)限制,它將被殺死,所以重要的是容器的實(shí)際記憶使用量仍然低于極限。使用的內(nèi)存量通常是JVM堆大?。ㄅ渲脼?nbsp;task.opts),加上任何非堆內(nèi)存分配的大?。ɡ?span> store.*.container.cache.size.bytes),另外還有一個(gè)安全余量以允許 JVM 開銷。
cluster-manager.container.cpu.cores1每個(gè)容器的工作要求的CPU核心數(shù)量。集群中的每個(gè)節(jié)點(diǎn)都有一定數(shù)量的可用CPU核心,所以這個(gè)數(shù)量(以及 cluster-manager.container.memory.mb)決定了一臺機(jī)器上可以運(yùn)行多少個(gè)容器。
cluster-manager.container.retry.count8如果容器出現(xiàn)故障,Samza 會自動重新啟動。但是,如果容器在啟動后不久就會發(fā)生故障,這表示更深層的問題,所以我們應(yīng)該殺死這個(gè)工作,而不是無限期地重試。此屬性確定快速連續(xù)重新啟動故障容器的最大次數(shù)(時(shí)間段配置為 cluster-manager.container.retry.window.ms)。作業(yè)中的每個(gè)容器都單獨(dú)計(jì)數(shù)。如果此屬性設(shè)置為0,任何失敗的容器立即導(dǎo)致整個(gè)作業(yè)失敗。如果設(shè)置為負(fù)數(shù),則重試次數(shù)沒有限制。
cluster-manager.container.retry.window.ms300000此屬性確定容器在放棄并失敗之前允許失敗的頻率。如果同一個(gè)容器的故障超過 cluster-manager.container.retry.count 次數(shù),并且故障之間的時(shí)間小于此屬性 cluster-manager.container.retry.window.ms(以毫秒為單位),則我們將失敗。如果故障之間的時(shí)間大于,則我們將重新啟動容器的次數(shù)沒有限制cluster-manager.container.retry.window.ms。
cluster-manager.jobcoordinator.jmx.enabled確定是否應(yīng)該在作業(yè)的JobCoordinator上啟動JMX服務(wù)器。truefalse)。
cluster-manager.allocator.sleep.ms3600容器分配器線程負(fù)責(zé)將請求與分配的容器進(jìn)行匹配。此線程的睡眠間隔使用此屬性進(jìn)行配置。
cluster-manager.container.request.timeout.ms5000分配器線程會定期檢查容器請求和分配的容器的狀態(tài),以確定容器對已分配資源的分配。此屬性確定容器請求被認(rèn)為已過期/超時(shí)之前的毫秒數(shù)。當(dāng)請求過期時(shí),它將被分配給集群管理器返回的任何可用容器。
在 YARN 集群上運(yùn)行您的工作
(本節(jié)適用于您設(shè)置了 job.factory.class= org.apache.samza.job.yarn.YarnJobFactory
yarn.package.pathYARN作業(yè)必需:可以從中下載作業(yè)包的 URL,例如http://hdfs://URL。作業(yè)包是一個(gè)具有特定目錄結(jié)構(gòu)的 .tar.gz 文件 。
yarn.container.memory.mb1024這不支持 cluster-manager.container.memory.mb
yarn.container.cpu.cores1這不支持 cluster-manager.container.cpu.cores
yarn.container.retry.count8這不支持 cluster-manager.container.retry.count
yarn.container.retry.window.ms300000這不支持 cluster-manager.container.retry.window.ms
yarn.am.container.memory.mb1024在紗線中運(yùn)行的每個(gè)Samza工作都有一個(gè)特殊的容器,即管理執(zhí)行作業(yè) ApplicationMaster(AM)。此屬性確定從YARN請求運(yùn)行ApplicationMaster的內(nèi)存(兆字節(jié))。
yarn.am.opts任何JVM選項(xiàng)在執(zhí)行Samza ApplicationMaster時(shí)都包含在命令行中 。例如,這可以用于設(shè)置 JVM 堆大小,調(diào)整垃圾回收器或啟用遠(yuǎn)程調(diào)試。
yarn.am.java.homeSamza AM 的 JAVA_HOME 路徑。通過設(shè)置此屬性,可以使用與集群的Java版本不同的java版本。記住設(shè)置task.java.home好。
例: yarn.am.java.home=/usr/java/jdk1.8.0_05
yarn.am.poll.interval.ms1000Samza ApplicationMaster會將定期的心跳發(fā)送到Y(jié)ARN ResourceManager以確認(rèn)它是否存活。此屬性確定心跳之間的時(shí)間(以毫秒為單位)。
yarn.am.jmx.enabled這不支持 cluster-manager.jobcoordinator.jmx.enabled
yarn.allocator.sleep.ms3600這不支持 cluster-manager.allocator.sleep.ms
yarn.samza.host-affinity.enabled這不支持 job.host-affinity.enabled
yarn.container.request.timeout.ms5000這不支持 cluster-manager.container.request.timeout.ms
yarn.queue確定哪個(gè) YARN 隊(duì)列將用于 Samza 作業(yè)。
yarn.kerberos.principal當(dāng)在啟用 Kerberos 的 YARN 群集上運(yùn)行時(shí),Samza 作業(yè)的主體用于向 KDC 進(jìn)行身份驗(yàn)證。
yarn.kerberos.keytab包含由主體指定的keytab的文件的完整路徑,由yarn.kerberos.principal 指定keytab 文件上傳到 HDFS上每個(gè)應(yīng)用程序唯一的登臺目錄,然后應(yīng)用程序主機(jī)使用密鑰表和主體定期登錄以重新創(chuàng)建委托令。
yarn.token.renewal.interval.seconds應(yīng)用程序主人重新認(rèn)證和更新委托令牌的時(shí)間間隔。此值應(yīng)小于授權(quán)令牌在有效期限之前在 hadoop namenode 上有效的時(shí)間長度。
yarn.resources.resource-name.path資源名稱資源 本地化的路徑路徑中的方案(例如http,ftp,hdsf,文件等)應(yīng)在YARN core-site.xml中配置為fs。<scheme> .impl并與FileSystem相關(guān)聯(lián)。如果已定義,則在Samza作業(yè)運(yùn)行之前,該資源將被本地化在Samza應(yīng)用程序目錄中。
yarn.resources.resource-name.local.nameresource-name本地化后資源的新本地名稱。此配置僅適用于yarn.resources.resource-name.path 已配置的時(shí)候。
yarn.resources.resource-name.local.typeFILE
本地化后資源的類型。它可以是 ARCHIVE(歸檔目錄),F(xiàn)ILE 或 PATTERN(使用模式從歸檔中提取的條目)。此配置僅適用于 yarn.resources.resource-name.path 已配置的時(shí)候。
yarn.resources.resource-name.local.visibilityAPPLICATION
本地化后資源的可見性。它可以是 PUBLIC(對所有人都可見),PRIVATE(對于與此應(yīng)用程序相同的帳戶用戶的所有Samza應(yīng)用程序可見)或APPLICATION(僅對該Samza應(yīng)用程序可見)。此配置僅適用于 yarn.resources.resource-name.path 已配置的時(shí)候。
指標(biāo)
metrics.reporter.reporter-name.class

Samza 自動跟蹤各種衡量指標(biāo),這些指標(biāo)對于監(jiān)控作業(yè)的健康狀況非常有用,您還可以跟蹤自己的指標(biāo)。使用此屬性,您可以定義任意數(shù)量的指標(biāo)記錄器,將指標(biāo)發(fā)送到您選擇的系統(tǒng)(用于繪圖,警報(bào)等)。你給每個(gè)記者一個(gè)任意的記者名字為了使記者能夠參考記者名字, 指標(biāo)為準(zhǔn)。此屬性的值是實(shí)現(xiàn) MetricsReporterFactory 的Java類的完全限定名稱 默認(rèn)情況下,Samza會附帶這些實(shí)現(xiàn):
org.apache.samza.metrics.reporter.JmxReporterFactory
有了這個(gè)記者,每個(gè)容器都會以 JMX MBeans 的形式公布自己的指標(biāo)。JMX 服務(wù)器在隨機(jī)端口上啟動,以避免在同一機(jī)器上運(yùn)行的容器之間發(fā)生沖突。
org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
記者將每個(gè)分鐘的所有度量值的最新值作為消息發(fā)送到輸出流。輸出流配置有 metrics.reporter.*.stream ,它可以使用Samza支持的任何系統(tǒng)。
metrics.reporters如果您已經(jīng)使用 metrics.reporter.*.class 定義了任何指標(biāo)記錄器 ,那么您需要將它們列在這里才能啟用它們。該屬性的值是以逗號分隔的記者名稱令牌列表。
metrics.reporter.reporter-name.stream
如果您已經(jīng)注冊了 metric metricsreports.export.*.class = org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory,那么您需要設(shè)置此屬性來配置要發(fā)送度量數(shù)據(jù)的輸出流。流以系統(tǒng)名稱的形式給出 流名稱,系統(tǒng)必須在作業(yè)配置中定義。將許多不同的作業(yè)發(fā)布到相同的指標(biāo)流可以很好。Samza定義了一個(gè)簡單的 JSON編碼 用于度量; 為了使用此編碼,您還需要為度量流配置 serde:
  • stream.*.samza.msg.serde= metrics-serde(將星號替換為指標(biāo)流 流名稱
  • serializers.registry.metrics-serde.class = org.apache.samza.serializers.MetricsSnapshotSerdeFactory (注冊下的SERDE SERDE名 metrics-serde
metrics.reporter.reporter-name.interval如果您已經(jīng)注冊了指標(biāo)報(bào)告器 metricss.reporter.*.class = org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory,您可以使用此屬性來配置記者報(bào)告注冊的指標(biāo)的頻率。此屬性的值應(yīng)為連續(xù)指標(biāo)報(bào)告間隔的長度。該值以秒為單位,應(yīng)為正整數(shù)值。此屬性是可選的,默認(rèn)設(shè)置為60,這意味著每60秒報(bào)告一次。
寫入HDFS
systems.system-name.producer.hdfs.writer.classorg.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter這個(gè) HDFS Producer 系統(tǒng)應(yīng)該使用的 HdfsWriter 的完全合格的類名
systems.system-name.
producer.hdfs.compression.type
沒有用于使用壓縮類型的人類可讀標(biāo)簽,例如“gzip”“snappy”等。根據(jù) HdfsWriter 實(shí)現(xiàn)的性質(zhì),此標(biāo)簽將被不同地(或忽略)解釋。
systems.system-name.
producer.hdfs.base.output.dir
/user/USERNAME/SYSTEMNAMEHDFS 的基本輸出目錄寫入。默認(rèn)為運(yùn)行作業(yè)的用戶的主目錄,后跟 job.properties 文件中定義的此HdfsSystemProducer 的 systemName。
systems.system-name.
producer.hdfs.bucketer.class
org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
用于管理 HDFS 路徑和文件名的 Bucketer 實(shí)現(xiàn)的完全限定類名稱。用于按時(shí)間批量寫入或其他類似的分區(qū)方法。
systems.system-name.
producer.hdfs.bucketer.date.path.format
適用于 HDFS 路徑的日期格式(使用 Java 的SimpleDataFormat 語法),可以配置輸出文件的基于時(shí)間的功能。
systems.system-name.
producer.hdfs.write.batch.size.bytes
268435456在剪切新文件之前,要寫入每個(gè) HDFS 輸出文件的傳出消息的字節(jié)數(shù)。如果沒有設(shè)置,默認(rèn)為 256MB。
systems.system-name.
producer.hdfs.write.batch.size.records
262144在剪切新文件之前要寫入每個(gè) HDFS 輸出文件的傳出消息的數(shù)量。如果未設(shè)置,默認(rèn)為 262144。
從HDFS閱讀
systems.system-name.consumer.bufferCapacity10hdfs 用戶緩沖區(qū)的容量 - 用于存儲消息的阻塞隊(duì)列。較大的緩沖區(qū)容量通常會導(dǎo)致更好的吞吐量,但會消耗更多的內(nèi)存。
systems.system-name.consumer.numMaxRetries10在容器發(fā)生故障之前從 HDFS 獲取消息失敗時(shí)的重試嘗試次數(shù)。
systems.system-name.
partitioner.defaultPartitioner.whitelist
*目錄分區(qū)器使用的白名單,以 Java Pattern 樣式在 hdfs 目錄中選擇文件。
systems.system-name.
partitioner.defaultPartitioner.blacklist
目錄分區(qū)器使用的黑名單,以 Java Pattern 樣式過濾 hdfs 目錄中的不需要的文件。
systems.system-name.
partitioner.defaultPartitioner.groupPattern
目錄分區(qū)用于高級分區(qū)的組模式。高級分區(qū)超出了每個(gè)文件是分區(qū)的基本假設(shè)。使用高級分區(qū),您可以任意地將文件分組到分區(qū)。例如,如果您有一組文件為:[part-01-a.avro,part-01-b.avro,part-02-a.avro,part-02-b.avro,part-03-a。 avro],
你想組織分區(qū):(part-01-a.avro,part-01-b.avro),(part-02-a.avro,part-02-b.avro),(part- 03-a.avro),其中中間的數(shù)字作為“組標(biāo)識符”,您可以將此屬性設(shè)置為 “part- [id] - .*”(注意 “[id]” 是一個(gè)保留的在這里,即你必須把它作為 “[id]”)。
分區(qū)器將將此模式應(yīng)用于所有文件名,并提取“組標(biāo)識符”(“[id]” 在模式中),然后使用“組標(biāo)識符”將文件分組到分區(qū)。查看更多詳細(xì)信息 
HdfsSystemConsumer設(shè)計(jì)文檔(這是個(gè)下載鏈接)
systems.system-name.consumer.readeravro用于不同事件格式的文件讀取器類型(avro,plain,json等)。“avro” 現(xiàn)在只支持類型。
systems.system-name.stagingDirectory用于存儲分區(qū)描述的分段目錄。默認(rèn)情況下(如果不是由用戶設(shè)置),該值將在內(nèi)部從 “yarn.job.staging.directory” 繼承。默認(rèn)值通常足夠好,除非您明確使用單獨(dú)的位置。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號