下表列出了可以包含在Samza作業(yè)配置文件中的所有標(biāo)準(zhǔn)屬性。
下表中加粗顯示的 system-name 是您自己的變量名的占位符。
名稱 | 默認(rèn) | 描述 |
---|---|---|
Samza工作配置 | ||
job.factory.class | 必需:用于運(yùn)行此作業(yè)的作業(yè)工廠。該值是一個(gè)完全限定的Java類名,必須實(shí)現(xiàn) StreamJobFactory。Samza 有三個(gè)實(shí)現(xiàn):
| |
job.name | 必需:您的工作的名稱。該名稱出現(xiàn)在 Samza 儀表板上,用于從其他工作的檢查點(diǎn)分離此工作的檢查點(diǎn)。 | |
job.id | 1 | 如果你同時(shí)運(yùn)行幾個(gè)你的工作實(shí)例,你需要給每個(gè)執(zhí)行一個(gè)不同的job.id 。這很重要,否則這些工作將覆蓋彼此的檢查點(diǎn),也可能以其他方式相互干擾。 |
job.coordinator.system | 必需:用于創(chuàng)建和維護(hù)協(xié)調(diào)器流的 system-name。 | |
job.default.system | 該 system-name 訪問系統(tǒng)沒有明確配置用于其中的任何輸入或輸出流。此屬性用于輸入和輸出流,而job.coordinator.system 適用于 samza 元數(shù)據(jù)流。 | |
job.coordinator.replication.factor | 3 | 如果您使用 Kafka 進(jìn)行協(xié)調(diào)器流,那么您希望協(xié)調(diào)器主題復(fù)制其持久性的 Kafka 節(jié)點(diǎn)的數(shù)量。 |
job.coordinator.segment.bytes | 26214400 | 如果您使用 Kafka 系統(tǒng)進(jìn)行協(xié)調(diào)器流,則這是用于協(xié)調(diào)器主題日志段的段大小。保持這個(gè)數(shù)字很小是有用的,因?yàn)樗黾恿薑afka 垃圾收集舊消息的頻率。 |
job.coordinator.monitor-partition-change | 假 | 如果您使用 Kafka 進(jìn)行協(xié)調(diào)器流,則此配置可使作業(yè)協(xié)調(diào)器檢測 Kafka 輸入主題中的分區(qū)計(jì)數(shù)差異。檢測時(shí),它會更新system-name 格式的 Gauge 指標(biāo)。stream-name .partitionCount,表示分區(qū)計(jì)數(shù)與初始狀態(tài)的差異。請注意,目前此功能僅適用于基于卡夫卡的系統(tǒng)。 |
job.coordinator.monitor-partition-change.frequency.ms | 300000 | 應(yīng)檢測輸入流分區(qū)計(jì)數(shù)變化的頻率。由于分區(qū)增加不是常見事件,所以可以將此檢查調(diào)整為相當(dāng)?shù)偷摹?/font> |
job.config.rewriter.rewriter-name.class | 您可以選擇定義配置重寫器,有可能在作業(yè)啟動前動態(tài)修改作業(yè)配置。例如,這可以用于從外部配置管理系統(tǒng)中拉取配置,或者用于在運(yùn)行時(shí)動態(tài)地確定輸入流集合。該屬性的值是一個(gè)完全限定的Java類名,它必須實(shí)現(xiàn) ConfigRewriter。默認(rèn)情況下,Samza與這些重寫器一起發(fā)貨:
| |
job.config.rewriters | 如果您已定義配置重寫器,則需要按照應(yīng)用順序?qū)⑺鼈兞性谶@里。此屬性的值為以逗號分隔的重寫器名稱令牌列表 。 | |
job.systemstreampartition.grouper.factory | org.apache.samza.container.grouper.stream.GroupByPartitionFactory | 用于確定如何將SystemStreamPartition輸入組合在一起以在單個(gè)StreamTask實(shí)例中處理的工廠類。工廠必須實(shí)現(xiàn)SystemStreamPartitionGrouperFactory接口。一旦設(shè)置了這個(gè)配置,就不能改變,因?yàn)檫@樣做可能會違反狀態(tài)語義,并導(dǎo)致數(shù)據(jù)丟失。
|
job.systemstreampartition.matcher.class | 如果要啟用靜態(tài)分區(qū)分配,則這是必需的配置。此屬性的值是實(shí)現(xiàn)該接口的完全限定的 Java 類名稱 org.apache.samza.system.SystemStreamPartitionMatcher Samza 配有兩個(gè)匹配課:
| |
job.systemstreampartition.matcher.config.range | 如果job.systemstreampartition.matcher.class 指定,并且該屬性的值是 org.apache.samza.system.RangeSystemStreamPartitionMatcher ,則此屬性是 必需的配置。指定一個(gè)逗號分隔的范圍列表,以確定哪個(gè)分區(qū)匹配,從而靜態(tài)分配給作業(yè)。例如“2,3,11-20”,對于所有指定的系統(tǒng)和流(在Kafka的情況下的主題),為該作業(yè)靜態(tài)分配2,3和11到20。像“19”這樣的 singel 配置值也是有效的。這個(gè)靜態(tài)分配分區(qū)19。對于 config 驗(yàn)證,逗號分隔列表中的每個(gè)元素都很符合以下正則表達(dá)式之一:
| |
job.systemstreampartition.matcher.config.regex | 如果job.systemstreampartition.matcher.class 指定,并且該屬性的值是 org.apache.samza.system.RegexSystemStreamPartitionMatcher ,則此屬性是 必需的配置。該值應(yīng)該是一個(gè)有效的 Java 支持的正則表達(dá)式。例如 “[1-2]”,將所有指定的系統(tǒng)和流(Kakfa 的情況下的主題)的分區(qū)1和2靜態(tài)分配給作業(yè)。 | |
job.systemstreampartition.matcher.config.job.factory.regex | 此配置可用于指定 Java 支持的正則表達(dá)式,以匹配StreamJobFactory 應(yīng)啟用靜態(tài)分區(qū)分配的正則表達(dá)式。此配置使分區(qū)分配功能也可用于自定義StreamJobFactory 。此配置默認(rèn)為以下值: | |
job.checkpoint.validation.enabled | 真 | 此設(shè)置控制作業(yè)是否應(yīng)該失?。╰rue)或只是警告(false),以防止檢查點(diǎn)分區(qū)號驗(yàn)證失敗。 注意:此配置需要謹(jǐn)慎使用。在檢查點(diǎn)自動創(chuàng)建錯(cuò)誤數(shù)量的分區(qū)之后,它應(yīng)該僅用作解決方法。 |
job.security.manager.factory | 這是用于創(chuàng)建適當(dāng)?shù)?nbsp;SecurityManager 的 factory 類,用于在安全環(huán)境中運(yùn)行時(shí)處理 Samza 容器的安全性,例如使用Kerberos 進(jìn)行的紗線。默認(rèn)情況下: | |
job.container.count | 1 | 要求運(yùn)行您的工作的YARN容器的數(shù)量。這是控制作業(yè)規(guī)模(分配計(jì)算資源)的主要參數(shù):為了增加處理的并行性,您需要增加容器數(shù)量。最小值是一個(gè)容器,容器的最大數(shù)量是任務(wù)實(shí)例的數(shù)量(通常是輸入流分區(qū)的 數(shù)量)。任務(wù)實(shí)例均勻分布在您指定的容器數(shù)量上。 |
job.container.single.thread.mode | 假 | 如果設(shè)置為 true,samza 將回退到傳統(tǒng)的單線程事件循環(huán)。默認(rèn)值為 false,這將啟用多線程執(zhí)行。 |
job.container.thread.pool.size | 如果已配置,容器線程池將用于并行運(yùn)行每個(gè)任務(wù)的同步操作。操作包括 StreamTask.process(),WindowableTask.window()和內(nèi)部的 Task.commit()。請注意,線程池不適用于AsyncStremTask.processAsync()。大小應(yīng)該始終大于零。如果未配置,則所有任務(wù)操作都將在單個(gè)線程中運(yùn)行。 | |
job.host-affinity.enabled | 假 | 此屬性指示是否啟用主機(jī)關(guān)聯(lián)。主機(jī)關(guān)聯(lián)是指 Samza 每次部署作業(yè)時(shí)在同一臺主機(jī)上請求和分配容器的能力。當(dāng)啟用主機(jī)密切關(guān)系時(shí),Samza 做出了“盡力而為”來遵守主機(jī)關(guān)聯(lián)約束。屬性 cluster-manager.container.request.timeout.ms 確定在對主機(jī)相關(guān)性約束進(jìn)行取消優(yōu)先級排序并將容器分配給任何可用資源之前等待多長時(shí)間。 請注意:啟用連續(xù)調(diào)度后,該功能將經(jīng)過 yarn 中的 FairScheduler 測試。 |
job.changelog.system | 此屬性指定一個(gè)用于 changelog 的默認(rèn)系統(tǒng),它將與 stores.store-name.changelog config中指定的流一起使用 。您可以通過在 stores.store-name.changelog 中指定系統(tǒng)和流來覆蓋此系統(tǒng) 。 | |
job.coordinator.factory | 用于工作協(xié)調(diào)的類。目前可用的值為:
| |
基于Zookeeper的作業(yè)配置 | ||
job.coordinator.zk.connect | 對于使用基于 Zookeeper 協(xié)調(diào)的應(yīng)用程序而言是必需的。Zookeeper 坐標(biāo)(在“host:port [/ znode]”格式中)用于協(xié)調(diào)。 | |
job.coordinator.zk.session.timeout.ms | 30000 | 所有 ZK 連接的 Zookeeper 會話超時(shí)以毫秒為單位。會話超時(shí)控制在不能與 ZK 服務(wù)器通話之前,zk 客戶端在拋出異常之前等待多久。 |
job.coordinator.zk.connection.timeout.ms | 60000 | Zookeeper 連接超時(shí)(毫秒)。Zk 連接超時(shí)控制客戶端在放棄之前嘗試連接到 ZK 服務(wù)器的時(shí)間。 |
job.coordinator.zk.consensus.timeout.ms | 40000 | 每個(gè)處理器將等待所有處理器在回滾之前報(bào)告接受新作業(yè)模型的時(shí)間。 |
job.debounce.time.ms | 2000 | 在注冊處理器更改之前,Leader 處理器需要等待多久才能重新計(jì)算 JobModel。 |
任務(wù)配置 | ||
task.class | 必需:從輸入流處理傳入消息的 Java 類的完全限定名稱。這個(gè)類必須實(shí)現(xiàn) StreamTask 或 AsyncStreamTask,并且可任選地實(shí)現(xiàn) InitableTask, ClosableTask 和 WindowableTask。該類將被實(shí)例化幾次,每次輸入流分區(qū)一次 。 | |
task.inputs | 必需:以此作業(yè)使用的流的逗號分隔列表。每個(gè)流都以system-name 格式給出 stream-name 。舉例來說,如果你有一個(gè)叫做輸入系統(tǒng)my-kafka ,并要消耗2個(gè) Kafka 主題叫做PageViewEvent 和UserActivityEvent ,那么你就設(shè)置 task.inputs=my-kafka.PageViewEvent, my-kafka.UserActivityEvent 。 | |
task.window.ms | -1 | 如果 task.class 實(shí)現(xiàn) WindowableTask,它可以定期接收一個(gè) 加窗回調(diào)。此屬性指定window()調(diào)用之間的時(shí)間,以毫秒為單位。如果數(shù)字為負(fù)數(shù)(默認(rèn)值),則不會調(diào)用window()。請注意,Samza是 單線程的,所以window()調(diào)用永遠(yuǎn)不會與處理消息同時(shí)發(fā)生。如果在window()調(diào)用到期時(shí)正在處理消息,則在完成當(dāng)前消息的處理之后發(fā)生window()調(diào)用。 |
task.checkpoint.factory | 要啟用 檢查點(diǎn),必須將此屬性設(shè)置為實(shí)現(xiàn)CheckpointManagerFactory 的 Java 類的完全限定名稱 。這不是必需的,但建議大多數(shù)工作。如果您沒有配置檢查點(diǎn),并且作業(yè)或容器重新啟動,它不記得它已經(jīng)處理了哪些消息。沒有檢查點(diǎn),消費(fèi)者行為由 ... samza.offset.default 決定設(shè)置,默認(rèn)情況下將跳過在容器重新啟動時(shí)發(fā)布的任何消息。檢查點(diǎn)允許作業(yè)啟動它以前停止的位置。默認(rèn)情況下,Samza 有兩個(gè)檢查點(diǎn)管理員:
| |
task.commit.ms | 60000 | 如果配置了 task.checkpoint.factory,則此屬性確定檢查點(diǎn)的寫入頻率。值是檢查點(diǎn)之間的時(shí)間,以毫秒為單位。檢查點(diǎn)的頻率會影響故障恢復(fù):如果容器意外失?。ɡ纾捎诒罎⒒驒C(jī)器故障)而重新啟動,則會在最后一個(gè)檢查點(diǎn)恢復(fù)處理。從失敗的容器上的最后一個(gè)檢查點(diǎn)處理的任何消息將被再次處理。檢查點(diǎn)更頻繁地減少可能被處理兩次的消息數(shù)量,而且還使用更多的資源。 |
task.command.class | org.apache.samza.job.ShellCommandBuilder | 確定 容器 的命令行和環(huán)境變量的Java類的完全限定名稱。它必須是 CommandBuilder 的子類 。默認(rèn)為task.command.class=org.apache.samza.job.ShellCommandBuilder 。 |
task.opts | 任何 JVM 選項(xiàng)在執(zhí)行Samza容器時(shí)都包含在命令行中。例如,這可以用于設(shè)置JVM堆大小,調(diào)整垃圾回收器或啟用 遠(yuǎn)程調(diào)試。運(yùn)行時(shí)無法使用ThreadJobFactory 。任何您放入的東西 task.opts 都將直接轉(zhuǎn)發(fā)到命令行,作為JVM調(diào)用的一部分。
| |
task.java.home | Samza 容器的 JAVA_HOME 路徑。通過設(shè)置此屬性,可以使用與集群的 Java 版本不同的 java 版本。記住設(shè)置yarn.am.java.home 好。
| |
task.execute | bin/ run-container.sh | 啟動 Samza 容器的命令。腳本必須包含在 作業(yè)包中。通常不需要定制這個(gè)。 |
task.chooser.class | org.apache.samza.system.chooser.RoundRobinChooserFactory | 該屬性可以選擇設(shè)置為覆蓋默認(rèn) 消息選擇器,該消息選擇器確定處理來自多個(gè)輸入流的消息的順序。此屬性的值是實(shí)現(xiàn)MessageChooserFactory 的 Java 類的完全限定名稱 。 |
task.drop.deserialization.errors | 這個(gè)屬性是定義系統(tǒng)如何??處理反序列化失敗的情況。如果設(shè)置為 true,系統(tǒng)將跳過錯(cuò)誤消息并繼續(xù)運(yùn)行。如果設(shè)置為false,則系統(tǒng)將拋出異常并使容器失敗。默認(rèn)值為 false。 | |
task.drop.serialization.errors | 此屬性用于定義系統(tǒng)如何??處理序列化失敗情況。如果設(shè)置為true,系統(tǒng)將丟棄錯(cuò)誤消息并繼續(xù)運(yùn)行。如果設(shè)置為 false,則系統(tǒng)將拋出異常并使容器失敗。默認(rèn)值為 false。 | |
task.log4j.system | 指定 StreamAppender 的系統(tǒng)名稱。如果配置中未指定此屬性,則 Samza 將拋出異常。(參見 Stream Log4j Appender)
| |
task.log4j.location.info.enabled | 假 | 定義是否在 Log4j StreamAppender 消息中包含 log4j 的LocationInfo 數(shù)據(jù)。LocationInfo 包括寫入日志消息的文件,類和行等信息。此設(shè)置僅在使用Log4j流附加程序時(shí)有效。(參見Stream Log4j Appender)
|
task.poll.interval.ms | Samza 的容器在兩個(gè)條件下輪詢更多的消息。當(dāng)任何輸入SystemStreamPartition 不存在任何剩余的緩沖消息時(shí),第一個(gè)條件出現(xiàn)。第二個(gè)條件出現(xiàn)當(dāng)一些輸入SystemStreamPartitions 有空緩沖區(qū),但有些不具有。在后一種情況下,定義輪詢間隔以確定刷新空的SystemStreamPartition 緩沖區(qū)的頻率。默認(rèn)情況下,此間隔為 50ms,這意味著任何空的 SystemStreamPartition 緩沖區(qū)至少每 50ms 刷新一次。這里的值越大意味著空的SystemStreamPartition 將不會更頻繁地刷新,這意味著更多的延遲被引入,但是將使用更少的 CPU 和網(wǎng)絡(luò)。 | |
task.ignored.exceptions | 此屬性指定在任務(wù)process 或window 方法中拋出哪些異常應(yīng)忽略。被忽略的例外應(yīng)該是一個(gè)逗號分隔的例外的完全限定類名的列表,或 * 忽略所有異常。 | |
task.shutdown.ms | 5000 | 此屬性控制 Samza 容器等待有序關(guān)閉任務(wù)實(shí)例的時(shí)間。 |
task.name.grouper.factory | org.apache.samza.container.grouper.task.GroupByContainerCountFactory | 確定將生成 TaskNameGrouper 的工廠類的 Java 類的完全限定名稱。如果屬性不存在,則默認(rèn)配置值為task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory 用戶可以指定 TaskNameGrouperFactory 的自定義實(shí)現(xiàn),其中實(shí)現(xiàn)用于對任務(wù)進(jìn)行分組的自定義邏輯。 注意:對于非集群應(yīng)用程序(使用協(xié)調(diào)服務(wù)的應(yīng)用程序),必須使用 org.apache.samza.container.grouper.task.GroupByContainerIdsFactory |
task.broadcast.inputs | 此屬性指定所有任務(wù)應(yīng)消耗的分區(qū)。你把這里的systemStreamPartitions 發(fā)送到所有的任務(wù)。
| |
task.max.concurrency | 1 | 每個(gè)任務(wù)一次處理的最大數(shù)量的未完成的消息,它適用于StreamTask 和 AsyncStreamTask。值可以是:
|
task.callback.timeout.ms | 此屬性僅適用于 AsyncStreamTask。它定義從processAsync()到回調(diào)的最大時(shí)間間隔被觸發(fā)。當(dāng)超時(shí)發(fā)生時(shí),它將拋出一個(gè) TaskCallbackTimeoutException 并關(guān)閉該容器。默認(rèn)是沒有超時(shí)。 | |
task.consumer.batch.size | 1 | 如果設(shè)置為正整數(shù),則任務(wù)將嘗試使用 每個(gè)輸入流中給定數(shù)量的消息的批次,而不是從每個(gè)單獨(dú)消息的所有輸入流中消耗循環(huán)。在某些情況下,設(shè)置此屬性可以提高性能。 |
系統(tǒng) | ||
systems.system-name.samza.factory | 必需:提供系統(tǒng)的 Java 類的全限定名稱 。系統(tǒng)可以提供您可以在 Samza 作業(yè)中使用的輸入流,也可以輸出可以寫入的輸出流,或同時(shí)提供兩者。對系統(tǒng)的要求非常靈活 - 它可以連接到消息代理,或讀取和寫入文件,或使用數(shù)據(jù)庫或其他任何東西。該類必須實(shí)現(xiàn) SystemFactory。Samza 具有以下實(shí)現(xiàn):
| |
systems.system-name.default.stream* | 與系統(tǒng)關(guān)聯(lián)的任何流的一組默認(rèn)屬性。例如,如果配置了“systems.kafka-system.default.stream.replication.factor”= 2,則在kafka 系統(tǒng)上創(chuàng)建的每個(gè) Kafka 流將具有2的復(fù)制因子,除非在流中顯式覆蓋該屬性范圍使用流屬性。 | |
systems.system-name. default.stream.samza.key.serde | 該 SERDE 將用于反序列化 鍵上輸入流的消息,和序列化鍵上輸出流消息。此屬性定義系統(tǒng)中所有流的serde。請參閱 stream-scoped 屬性來定義單個(gè)流的 serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用 serializers.registry 注冊的 serde-namesystems.system-name.*。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。 | |
systems.system-name. default.stream.samza.msg.serde | 將用于反序列化輸入流上的消息值 的 serde,并串行化輸出流上消息的值。此屬性定義系統(tǒng)中所有流的serde。請參閱 stream-scoped 屬性來定義單個(gè)流的 serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用serializers.registry 注冊的 serde-name *。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。 | |
systems.system-name. default.stream.samza.offset.default | 即將到來 | 如果容器在沒有檢查點(diǎn)的情況下啟動,則該屬性確定輸入流中我們應(yīng)該開始使用的位置。該值必須為 OffsetType,為以下之一:
samza.offset.default 如果兩者都被定義,流級定義優(yōu)先。 |
systems.system-name.samza.key.serde | 這不支持 systems.system-name.default.stream.samza.key.serde | |
systems.system-name.streams.stream-name. samza.key.serde | 這不支持 streams.stream-id.samza.key.serde. | |
systems.system-name.samza.msg.serde | 這不支持 systems.system-name.default.stream.samza.msg.serde | |
systems.system-name.streams.stream-name. samza.msg.serde | 這不支持 streams.stream-id.samza.msg.serde | |
systems.system-name.samza.offset.default | 即將到來 | 這不支持 systems.system-name.default.stream.samza.offset.default. |
systems.system-name.streams.stream-name samza.offset.default | 這不支持 streams.stream-id.samza.offset.default | |
systems.system-name.streams.stream-name samza.reset.offset | 假 | 這不支持 streams.stream-id.samza.reset.offset. |
systems.system-name.streams.stream-name samza.priority | -1 | 這不支持 streams.stream-id.samza.priority. |
systems.system-name.streams.stream-name samza.bootstrap | 假 | 這不支持 streams.stream-id.samza.bootstrap. |
流 | ||
streams.stream-id.samza.system | 將訪問此流的系統(tǒng)的 system-name。此屬性將流綁定到由屬性系統(tǒng)定義的系統(tǒng)之一。system-name.samza.factory。 如果未指定此屬性,它將從 job.default.system 繼承。 | |
streams.stream-id.samza.physical.name | 系統(tǒng)上將訪問此流的物理名稱。這是與 Samza 用來識別流的邏輯名稱的 stream-id 相反的。物理名稱可以是 Kafka 主題名稱,HDFS 文件 URN 或任何其他系統(tǒng)特定的標(biāo)識符。 | |
streams.stream-id.samza.key.serde | 該 SERDE 將用于反序列化 鍵上輸入流的消息,和序列化鍵上輸出流消息。此屬性定義單個(gè)流的serde。請參閱 system-scoped 屬性以定義系統(tǒng)中所有流的 serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用 serializers.registry 注冊的 serde-name.*。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。 | |
streams.stream-id.samza.msg.serde | 將用于反 序列化輸入流上的消息值的 serde,并串行化輸出流上消息的值。此屬性定義單個(gè)流的serde。請參閱 system-scoped 屬性以定義系統(tǒng)中所有流的serde。如果兩者都被定義,流級定義優(yōu)先。此屬性的值必須是使用serializers.registry 注冊的 serde-name .*。如果未設(shè)置此屬性,消息將在輸入流消費(fèi)者,任務(wù)和輸出流生成器之間未修改傳遞。 | |
streams.stream-id.samza.offset.default | 即將到來 | 如果容器在沒有檢查點(diǎn)的情況下啟動,則該屬性確定輸入流中我們應(yīng)該開始使用的位置。該值必須為 OffsetType,為以下之一:
|
streams.stream-id.samza.reset.offset | 假 | 如果設(shè)置為true ,當(dāng)Samza容器啟動時(shí),它將忽略該特定輸入流的任何 檢查點(diǎn)偏移量。因此,其行為由samza.offset.default 設(shè)置決定。請注意,每次啟動容器時(shí),復(fù)位都將生效,這可能是每次重新啟動作業(yè)時(shí),或者如果容器發(fā)生故障并由框架重新啟動,則會更頻繁。 |
streams.stream-id.samza.priority | -1 | 如果一個(gè)或多個(gè)流具有優(yōu)先級設(shè)置(任何正整數(shù)),則將以比其他流更高的優(yōu)先級處理它們。您可以將多個(gè)流設(shè)置為相同的優(yōu)先級,或通過為較高優(yōu)先級的流分配更多的數(shù)字來定義多個(gè)優(yōu)先級。如果較高優(yōu)先級的流具有可用的消息,則它們將始終被處理; 來自較低優(yōu)先級流的消息僅在高優(yōu)先級輸入沒有新消息時(shí)被處理。 |
streams.stream-id.samza.bootstrap | 假 | 如果設(shè)置為true ,該流將被作為streams.自舉流處理。這意味著每次 Samza 容器啟動時(shí),在處理任何其他流的消息之前,該流將被完全消耗。 |
streams.stream-id.* | 流的任何屬性。這些通常是系統(tǒng)特定的,可以由系統(tǒng)用于流創(chuàng)建或驗(yàn)證。請注意,其他屬性前綴為 samza。 將它們區(qū)分為不是系統(tǒng)特定的 Samza 屬性。 | |
串行器/解串器(Serdes) | ||
serializers.registry.serde-name .class | 使用此屬性注冊序列化器/解串器,它定義了將應(yīng)用對象編碼為字節(jié)數(shù)組(用于流中的消息以及持久存儲中的數(shù)據(jù))的方式。你可以給你所需要的任何 serde-serde 名字,并在系統(tǒng)的屬性中引用該名稱 systems.*.samza.key.serde, streams.*.samza.key.serde,streams.*.samza.msg.serde, stores.*.key.serde systems.*.samza.msg.serde,streams.*.samza.key.serde, streams.*.samza.key.serde, streams.*.samza.msg.serde,stores.*.key.serde 和 stores.*.msg.serde。此屬性的值是實(shí)現(xiàn) SerdeFactory 的 Java 類的完全限定名稱 。Samza 有幾個(gè) serdes:
| |
將文件系統(tǒng)用于檢查點(diǎn) (本節(jié)適用于您已設(shè)置 task.checkpoint.factory = org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory ) | ||
task.checkpoint.path | 如果將文件系統(tǒng)用于檢查點(diǎn),則為必需。將其設(shè)置為本地文件系統(tǒng)上應(yīng)存儲檢查點(diǎn)文件的路徑。 | |
使用 Elasticsearch 輸出流 (適用于您已設(shè)置 systems.*.samza.factory = org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory ) | ||
systems.system-name.client.factory | 必需:彈性搜索客戶端工廠用于連接到彈性搜索集群。Samza具有以下實(shí)現(xiàn):
| |
systems.system-name.index.request.factory | org.apache.samza.system.elasticsearch.indexrequest. DefaultIndexRequestFactory | 索引請求工廠將Samza OutgoingMessageEnvelope轉(zhuǎn)換為IndexRequest以發(fā)送到彈性搜索。默認(rèn)的IndexRequestFactory的行為如下:
|
systems.system-name.client.transport.host | 需要的TransportClientFactory TransportClientFactory 連接到的主機(jī)名。 | |
systems.system-name.client.transport.port | 需要的TransportClientFactory TransportClientFactory 連接到的端口。 | |
systems.system-name.client.elasticsearch.* | 任何 Elasticsearch 客戶端設(shè)置都可以在這里使用。它們都將傳遞給傳輸和節(jié)點(diǎn)客戶端。您想要提供的一些常見設(shè)置是。
| |
systems.system-name.bulk.flush.max.actions | 1000 | 沖洗前要緩沖的最大消息數(shù)。 |
systems.system-name.bulk.flush.max.size.mb | 五 | 沖洗前緩沖的消息的最大聚合大小。 |
systems.system-name.bulk.flush.interval.ms | 決不 | 緩沖消息應(yīng)該被刷新多久。 |
使用 Kafka 輸入流,輸出流和檢查點(diǎn) (適用于您已設(shè)置 systems.*.samza.factory = org.apache.samza.system.kafka.KafkaSystemFactory ) | ||
systems.system-name.consumer.zookeeper.connect | 可以找到有關(guān) Kafka 群集信息的一個(gè)或多個(gè) Zookeeper 節(jié)點(diǎn)的主機(jī)名和端口。這是以逗號分隔的hostname:port 對列表給出的 ,例如zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181 。如果集群信息位于 Zookeeper 命名空間的某個(gè)子路徑,則需要將路徑包含在主機(jī)名列表的末尾,例如:zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181/clusters/my-kafka | |
systems.system-name.consumer.auto.offset.reset | 最大 | 此設(shè)置確定使用者如何嘗試讀取超出當(dāng)前有效范圍的偏移量會發(fā)生什么。如果主題不存在,或者如果檢查點(diǎn)比代理所保留的最大消息歷史更早,則可能會發(fā)生這種情況。這個(gè)屬性不要與 systems.*.samza.offset.default 混淆,它確定如果沒有檢查點(diǎn)會發(fā)生什么。以下是以下的有效值auto.offset.reset :
|
systems.system-name.consumer.* | 任何 Kafka 使用者配置 都可以包含在這里。例如,要更改套接字超時(shí),可以設(shè)置系統(tǒng)。systemname .consumer.socket.timeout.ms。(沒有必要配置,group.id 或者client.id 是由Samza自動配置,而且沒有必要設(shè)置, auto.commit.enable 因?yàn)镾amza 有自己的檢查點(diǎn)機(jī)制) | |
systems.system-name.producer.bootstrap.servers | 注意: 此變量之前已定義為“producer.metadata.broker.list”,該版本已被棄用。 Kafka 代理正在運(yùn)行的網(wǎng)絡(luò)端點(diǎn)列表。例如,以逗號分隔的 hostname:port 對列表給出 kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092 。不需要列出集群中的每個(gè)單個(gè)Kafka節(jié)點(diǎn):Samza使用此屬性來發(fā)現(xiàn)哪些主題和分區(qū)托管在哪個(gè)代理上。即使您只是從卡夫卡消費(fèi)而不是寫信給這個(gè)屬性,因?yàn)?Samza 使用它來發(fā)現(xiàn)關(guān)于正在消費(fèi)的流的元數(shù)據(jù)。 | |
systems.system-name.producer.producer.* | 這里可以包含 任何 Kafka 生產(chǎn)者配置。例如,要更改請求超時(shí),可以設(shè)置系統(tǒng)。system-name .producer.timeout.ms。(沒有必要配置,client.id 因?yàn)樗怯蒘amza自動配置的。) | |
systems.system-name.samza.fetch.threshold | 50000 | 當(dāng)從 Kafka 消耗流時(shí),Samza 容器為傳入的消息維護(hù)一個(gè)內(nèi)存中的緩沖區(qū),以增加吞吐量(流任務(wù)可以繼續(xù)處理緩沖的消息,而從 Kafka 獲取新消息)。此參數(shù)確定了我們旨在緩沖容器所消耗的所有流分區(qū)的消息數(shù)。例如,如果一個(gè)容器消耗50個(gè)分區(qū),它將默認(rèn)緩存每個(gè)分區(qū)的1000個(gè)消息。當(dāng)緩沖消息的數(shù)量低于該閾值時(shí),Samza 從 Kafka 代理獲取更多消息來補(bǔ)充緩沖區(qū)。增加此參數(shù)可以增加作業(yè)“ |
systems.system-name.samza.fetch.threshold.bytes | -1 | 當(dāng)從 Kafka 消耗流時(shí),Samza 容器為傳入的消息維護(hù)一個(gè)內(nèi)存中的緩沖區(qū),以增加吞吐量(流任務(wù)可以繼續(xù)處理緩沖的消息,而從 Kafka 獲取新消息)。此參數(shù)確定了我們旨在基于字節(jié)在容器所消耗的所有流分區(qū)之間緩沖的消息的總大小。定義作為整體的緩沖預(yù)取消息使用的字節(jié)數(shù)。基于此計(jì)算單個(gè)系統(tǒng)/流/分區(qū)的字節(jié)。這將獲取整個(gè)消息,因此這個(gè)字節(jié)限制是一個(gè)軟的,并且實(shí)際使用可以是給定流的分區(qū)中的最大消息的字節(jié)數(shù)限制+大小。如果此屬性的值為大于 0,那么這將優(yōu)先于系統(tǒng)。system-name.samza.fetch.threshold。 例如, 如果 fetchThresholdBytes 設(shè)置為100000個(gè)字節(jié), 并且注冊了 50 SystemStreamPartitions, 則每個(gè)分區(qū)的閾值為 (100000/2)/50 = 1000 字節(jié)。由于這是一個(gè)軟限制, 實(shí)際使用可以是1000字節(jié) + 最大消息的大小。一旦 SystemStreamPartition 的緩沖消息字節(jié)數(shù)降至 1000 以下, 就會執(zhí)行回遷請求以獲取更多數(shù)據(jù)。增加此參數(shù)將減少當(dāng)隊(duì)列被消息耗盡和新消息排隊(duì)時(shí)之間的延遲, 但也會導(dǎo)致內(nèi)存使用量增加, 因?yàn)閷⒂懈嗟南⒈4嬖趦?nèi)存中。默認(rèn)值為-1, 表示不使用此方法。 |
task.checkpoint.system | 如果您使用Kafka檢查點(diǎn)(task.checkpoint.factory = org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory ),則此屬性是必需的。您必須將其設(shè)置為 Kafka 系統(tǒng)的系統(tǒng)名稱。該系統(tǒng)中的流名稱(主題名稱)將根據(jù)作業(yè)名稱和ID自動確定:( 在作業(yè)名稱中帶下劃線,ID由連字符替換)。 __samza_checkpoint_${job.name}_${job.id} | |
task.checkpoint.replication.factor | 3 | 如果您使用 Kafka 作為檢查點(diǎn),那么您希望將檢查點(diǎn)主題復(fù)制到其持久性的 Kafka 節(jié)點(diǎn)的數(shù)量。 |
task.checkpoint.segment.bytes | 26214400 | 如果您使用 Kafka 作為檢查點(diǎn),則這是用于檢查點(diǎn)主題的日志段的段大小。保持這個(gè)數(shù)字很小是有用的,因?yàn)樗黾恿丝ǚ蚩ɡ占f檢查點(diǎn)的頻率。 |
stores.store-name .changelog.replication.factor | stores.default.changelog.replication.factor | 該屬性定義要用于更改日志流的副本數(shù)。 |
stores.default.changelog.replication.factor | 2 | 此屬性定義要用于更改日志流的副本的默認(rèn)數(shù)量。 |
stores.store-name .changelog.kafka.topic-level-property | 該屬性允許您為要創(chuàng)建的更改日志主題指定主題級別設(shè)置。例如,您可以將清理策略指定為“stores.mystore.changelog.cleanup.policy = delete”。有關(guān)更多主題級配置,請參閱http://kafka.apache.org/documentation.html#configuration。 | |
使用與正則表達(dá)式匹配的所有 Kafka 主題 (本節(jié)適用于已設(shè)置 job.config.rewriter.*.class = org.apache.samza.config.RegExTopicGenerator ) | ||
job.config.rewriter.rewriter-name.system | 將此屬性設(shè)置為要從其中消費(fèi)所有匹配主題的 Kafka 系統(tǒng)的system-name。 | |
job.config.rewriter.rewriter-name.regex | 一個(gè)正則表達(dá)式,指定要在 Kafka 系統(tǒng)中使用哪些主題 job.config.rewriter.*.system。除了使用 task.inputs 指定的任何主題之外,與此正則表達(dá)式匹配的任何主題都將被使用。 | |
job.config.rewriter.rewriter-name.config.* | 在此命名空間中指定的任何屬性將應(yīng)用于與job.config.rewriter.*.regex中的正則表達(dá)式匹配的流的配置 。例如,您可以設(shè)置job.config.rewriter.*.config.samza.msg.serde 為匹配流中的郵件配置解串器,這相當(dāng)于為 正則表達(dá)式匹配的每個(gè)主題設(shè)置 sytems.*.streams.*.samza.msg.serde。 | |
存儲和國家管理 | ||
stores.store-name.factory | 這個(gè)屬性定義了一個(gè)商店,Samza的有效狀態(tài)流處理機(jī)制 。你可以給一家商店的任何商店的名字,除了默認(rèn)(在商店名稱 默認(rèn)保留定義默認(rèn)存儲參數(shù)),并使用該名稱在您的流任務(wù)中獲得的存儲的引用(在你的任務(wù)的 init()方法調(diào)用 TaskContext.getStore())。此屬性的值是實(shí)現(xiàn)的Java類的完全限定名稱 StorageEngineFactory。Samza目前隨附一個(gè)存儲引擎實(shí)現(xiàn):
| |
stores.store-name.key.serde | 如果存儲引擎預(yù)期存儲中的密鑰是簡單的字節(jié)數(shù)組,則該 serde 允許流任務(wù)使用另一對象類型作為密鑰訪問存儲。此屬性的值必須是使用 serializers.registry 注冊的 serde-name.*。如果沒有設(shè)置該屬性,鍵傳遞未修改存儲引擎(和 更新日志流,如果合適的話)。 | |
stores.store-name.msg.serde | 如果存儲引擎預(yù)期商店中的值是簡單的字節(jié)數(shù)組,則該 serde允許流任務(wù)使用另一個(gè)對象類型作為值訪問存儲。此屬性的值必須是使用 serializers.registry 注冊的 serde-name.*。如果未設(shè)置此屬性,則將值修改為存儲引擎(如果適用),則更改日志流。 | |
stores.store-name.changelog | Samza 商店是集裝箱本地的。如果容器發(fā)生故障,則商店的內(nèi)容將丟失。為了防止數(shù)據(jù)丟失,您需要將此屬性設(shè)置為配置更改日志流:Samza 然后確保對存儲的寫入將復(fù)制到此流,并在故障后從該流恢復(fù)存儲。該屬性的值以 system-name.stream-name的形式給出。“系統(tǒng)名稱”部分是可選的。如果省略,則必須在 job.changelog.system 中指定系統(tǒng)配置。任何輸出流都可以用作更改日志,但是您必須確保只有一個(gè)作業(yè)可以寫入給定的更改日志流(作業(yè)的每個(gè)實(shí)例,每個(gè)存儲需要自己的更改日志流)。 | |
使用 RocksDB 進(jìn)行鍵值存儲 (本節(jié)適用于您設(shè)置了 stores.*.factory = org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory ) | ||
stores.store-name.write.batch.size | 500 | 為了獲得更好的寫入性能,存儲引擎將緩沖區(qū)寫入并將其應(yīng)用到底層存儲中。如果快速連續(xù)寫入相同的密鑰多次,則該緩沖區(qū)也會對相同的密鑰進(jìn)行重復(fù)數(shù)據(jù)刪除。此屬性設(shè)置為每個(gè)任務(wù)實(shí)例中應(yīng)保存在此內(nèi)存緩沖區(qū)中的鍵/值對數(shù)。數(shù)量不能大于 stores.*.object.cache.size |
stores.store-name.object.cache.size | 1000 | Samza在經(jīng)常訪問的對象之前為RocksDB提供了額外的緩存。與緩存序列化對象的RocksDB塊緩存(stores.*.container.cache.size.bytes)相反,此緩存包含反序列化對象(避免緩存命中中的反序列化開銷)。此屬性確定每個(gè)任務(wù)實(shí)例在Samza緩存中保留的對象數(shù)。同樣的緩存也用于寫入緩沖(請參閱 stores.*.write.batch.size)。值為 0 將禁用所有緩存和批處理。 |
stores.store-name.container.cache.size.bytes | 104857600 | RocksDB的塊緩存大?。ㄒ宰止?jié)為單位),每個(gè)容器。如果一個(gè)容器內(nèi)有若干個(gè)任務(wù)實(shí)例,則每個(gè)容器中都有一個(gè)比例份額。請注意,這是一個(gè)非堆內(nèi)存分配,因此容器的總內(nèi)存使用量是最大的JVM堆大小加上此緩存的大小。 |
stores.store-name.container.write.buffer.size.bytes | 33554432 | 在每個(gè)容器寫入磁盤之前,RocksDB用于緩沖寫入的內(nèi)存量(以字節(jié)為單位)。如果一個(gè)容器內(nèi)有若干個(gè)任務(wù)實(shí)例,則每個(gè)容器中都有一個(gè)比例份額。此設(shè)置還可確定RocksDB的段文件的大小。 |
stores.store-name.rocksdb.compression | 瞬間 | 此屬性控制 RocksDB 是否應(yīng)壓縮磁盤上的數(shù)據(jù)和塊緩存中的數(shù)據(jù)。以下值有效:
|
stores.store-name.rocksdb.block.size.bytes | 4096 | 如果啟用了壓縮,RocksDB將這個(gè)許多未壓縮字節(jié)分組到一個(gè)壓縮塊中。您可能不需要更改此屬性。 |
stores.store-name.rocksdb.ttl.ms | 商店的生存時(shí)間。請注意,它不是嚴(yán)格的TTL限制(僅在壓實(shí)后才被刪除)。請謹(jǐn)慎打開帶有和不帶TTL的數(shù)據(jù)庫,因?yàn)樗赡軙p壞數(shù)據(jù)庫。使用前請務(wù)必閱讀約束。 | |
stores.store-name.rocksdb.compaction.style | 普遍 | 該屬性控制RocksDB在壓縮其級別時(shí)將采用的壓縮樣式。以下值有效:
|
stores.store-name.rocksdb.num.write.buffers | 3 | 配置 RocksDB 存儲使用的寫緩沖區(qū)數(shù)。這允許 RocksDB 繼續(xù)對其他緩沖區(qū)進(jìn)行寫入,即使給定的寫入緩沖區(qū)被刷新到磁盤。 |
stores.store-name.rocksdb.max.log.file.size.bytes | 67108864 | RocksDB LOG 文件在旋轉(zhuǎn)之前的最大大小(以字節(jié)為單位)。 |
stores.store-name.rocksdb.keep.log.file.num | 2 | RocksDB LOG 文件的數(shù)量(包括旋轉(zhuǎn) LOG.old.*文件)要保留。 |
與集群管理一起運(yùn)行 Samza | ||
cluster-manager.container.memory.mb | 1024 | 每個(gè)容器的工作需要從集群管理器請求多少內(nèi)存(兆字節(jié))。與 cluster-manager.container.cpu.cores 一起,此屬性確定集群管理器在一臺計(jì)算機(jī)上運(yùn)行的容器數(shù)量。如果容器超過這個(gè)限制,它將被殺死,所以重要的是容器的實(shí)際記憶使用量仍然低于極限。使用的內(nèi)存量通常是JVM堆大?。ㄅ渲脼?nbsp;task.opts),加上任何非堆內(nèi)存分配的大?。ɡ?span> store.*.container.cache.size.bytes),另外還有一個(gè)安全余量以允許 JVM 開銷。 |
cluster-manager.container.cpu.cores | 1 | 每個(gè)容器的工作要求的CPU核心數(shù)量。集群中的每個(gè)節(jié)點(diǎn)都有一定數(shù)量的可用CPU核心,所以這個(gè)數(shù)量(以及 cluster-manager.container.memory.mb)決定了一臺機(jī)器上可以運(yùn)行多少個(gè)容器。 |
cluster-manager.container.retry.count | 8 | 如果容器出現(xiàn)故障,Samza 會自動重新啟動。但是,如果容器在啟動后不久就會發(fā)生故障,這表示更深層的問題,所以我們應(yīng)該殺死這個(gè)工作,而不是無限期地重試。此屬性確定快速連續(xù)重新啟動故障容器的最大次數(shù)(時(shí)間段配置為 cluster-manager.container.retry.window.ms)。作業(yè)中的每個(gè)容器都單獨(dú)計(jì)數(shù)。如果此屬性設(shè)置為0,任何失敗的容器立即導(dǎo)致整個(gè)作業(yè)失敗。如果設(shè)置為負(fù)數(shù),則重試次數(shù)沒有限制。 |
cluster-manager.container.retry.window.ms | 300000 | 此屬性確定容器在放棄并失敗之前允許失敗的頻率。如果同一個(gè)容器的故障超過 cluster-manager.container.retry.count 次數(shù),并且故障之間的時(shí)間小于此屬性 cluster-manager.container.retry.window.ms (以毫秒為單位),則我們將失敗。如果故障之間的時(shí)間大于,則我們將重新啟動容器的次數(shù)沒有限制cluster-manager.container.retry.window.ms 。 |
cluster-manager.jobcoordinator.jmx.enabled | 真 | 確定是否應(yīng)該在作業(yè)的JobCoordinator上啟動JMX服務(wù)器。(true 或false )。 |
cluster-manager.allocator.sleep.ms | 3600 | 容器分配器線程負(fù)責(zé)將請求與分配的容器進(jìn)行匹配。此線程的睡眠間隔使用此屬性進(jìn)行配置。 |
cluster-manager.container.request.timeout.ms | 5000 | 分配器線程會定期檢查容器請求和分配的容器的狀態(tài),以確定容器對已分配資源的分配。此屬性確定容器請求被認(rèn)為已過期/超時(shí)之前的毫秒數(shù)。當(dāng)請求過期時(shí),它將被分配給集群管理器返回的任何可用容器。 |
在 YARN 集群上運(yùn)行您的工作 (本節(jié)適用于您設(shè)置了 job.factory.class = org.apache.samza.job.yarn.YarnJobFactory ) | ||
yarn.package.path | YARN作業(yè)必需:可以從中下載作業(yè)包的 URL,例如http:// 或hdfs:// URL。作業(yè)包是一個(gè)具有特定目錄結(jié)構(gòu)的 .tar.gz 文件 。 | |
yarn.container.memory.mb | 1024 | 這不支持 cluster-manager.container.memory.mb |
yarn.container.cpu.cores | 1 | 這不支持 cluster-manager.container.cpu.cores |
yarn.container.retry.count | 8 | 這不支持 cluster-manager.container.retry.count |
yarn.container.retry.window.ms | 300000 | 這不支持 cluster-manager.container.retry.window.ms |
yarn.am.container.memory.mb | 1024 | 在紗線中運(yùn)行的每個(gè)Samza工作都有一個(gè)特殊的容器,即管理執(zhí)行作業(yè)的 ApplicationMaster(AM)。此屬性確定從YARN請求運(yùn)行ApplicationMaster的內(nèi)存(兆字節(jié))。 |
yarn.am.opts | 任何JVM選項(xiàng)在執(zhí)行Samza ApplicationMaster時(shí)都包含在命令行中 。例如,這可以用于設(shè)置 JVM 堆大小,調(diào)整垃圾回收器或啟用遠(yuǎn)程調(diào)試。 | |
yarn.am.java.home | Samza AM 的 JAVA_HOME 路徑。通過設(shè)置此屬性,可以使用與集群的Java版本不同的java版本。記住設(shè)置task.java.home 好。
| |
yarn.am.poll.interval.ms | 1000 | Samza ApplicationMaster會將定期的心跳發(fā)送到Y(jié)ARN ResourceManager以確認(rèn)它是否存活。此屬性確定心跳之間的時(shí)間(以毫秒為單位)。 |
yarn.am.jmx.enabled | 真 | 這不支持 cluster-manager.jobcoordinator.jmx.enabled |
yarn.allocator.sleep.ms | 3600 | 這不支持 cluster-manager.allocator.sleep.ms |
yarn.samza.host-affinity.enabled | 假 | 這不支持 job.host-affinity.enabled |
yarn.container.request.timeout.ms | 5000 | 這不支持 cluster-manager.container.request.timeout.ms |
yarn.queue | 確定哪個(gè) YARN 隊(duì)列將用于 Samza 作業(yè)。 | |
yarn.kerberos.principal | 當(dāng)在啟用 Kerberos 的 YARN 群集上運(yùn)行時(shí),Samza 作業(yè)的主體用于向 KDC 進(jìn)行身份驗(yàn)證。 | |
yarn.kerberos.keytab | 包含由主體指定的keytab的文件的完整路徑,由yarn.kerberos.principal 指定。keytab 文件上傳到 HDFS上每個(gè)應(yīng)用程序唯一的登臺目錄,然后應(yīng)用程序主機(jī)使用密鑰表和主體定期登錄以重新創(chuàng)建委托令。 | |
yarn.token.renewal.interval.seconds | 應(yīng)用程序主人重新認(rèn)證和更新委托令牌的時(shí)間間隔。此值應(yīng)小于授權(quán)令牌在有效期限之前在 hadoop namenode 上有效的時(shí)間長度。 | |
yarn.resources.resource-name.path | 資源名稱資源 本地化的路徑。路徑中的方案(例如http,ftp,hdsf,文件等)應(yīng)在YARN core-site.xml中配置為fs。<scheme> .impl并與FileSystem相關(guān)聯(lián)。如果已定義,則在Samza作業(yè)運(yùn)行之前,該資源將被本地化在Samza應(yīng)用程序目錄中。 | |
yarn.resources.resource-name.local.name | resource-name | 本地化后資源的新本地名稱。此配置僅適用于yarn.resources.resource-name.path 已配置的時(shí)候。 |
yarn.resources.resource-name.local.type | FILE | 本地化后資源的類型。它可以是 ARCHIVE(歸檔目錄),F(xiàn)ILE 或 PATTERN(使用模式從歸檔中提取的條目)。此配置僅適用于 yarn.resources.resource-name.path 已配置的時(shí)候。 |
yarn.resources.resource-name.local.visibility | APPLICATION | 本地化后資源的可見性。它可以是 PUBLIC(對所有人都可見),PRIVATE(對于與此應(yīng)用程序相同的帳戶用戶的所有Samza應(yīng)用程序可見)或APPLICATION(僅對該Samza應(yīng)用程序可見)。此配置僅適用于 yarn.resources.resource-name.path 已配置的時(shí)候。 |
指標(biāo) | ||
metrics.reporter.reporter-name.class | Samza 自動跟蹤各種衡量指標(biāo),這些指標(biāo)對于監(jiān)控作業(yè)的健康狀況非常有用,您還可以跟蹤自己的指標(biāo)。使用此屬性,您可以定義任意數(shù)量的指標(biāo)記錄器,將指標(biāo)發(fā)送到您選擇的系統(tǒng)(用于繪圖,警報(bào)等)。你給每個(gè)記者一個(gè)任意的記者名字。為了使記者能夠參考記者名字,以 指標(biāo)為準(zhǔn)。此屬性的值是實(shí)現(xiàn) MetricsReporterFactory 的Java類的完全限定名稱 。默認(rèn)情況下,Samza會附帶這些實(shí)現(xiàn):
| |
metrics.reporters | 如果您已經(jīng)使用 metrics.reporter.*.class 定義了任何指標(biāo)記錄器 ,那么您需要將它們列在這里才能啟用它們。該屬性的值是以逗號分隔的記者名稱令牌列表。 | |
metrics.reporter.reporter-name.stream | 如果您已經(jīng)注冊了 metric metricsreports.export.*.class = org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory ,那么您需要設(shè)置此屬性來配置要發(fā)送度量數(shù)據(jù)的輸出流。流以系統(tǒng)名稱的形式給出 。流名稱,系統(tǒng)必須在作業(yè)配置中定義。將許多不同的作業(yè)發(fā)布到相同的指標(biāo)流可以很好。Samza定義了一個(gè)簡單的 JSON編碼 用于度量; 為了使用此編碼,您還需要為度量流配置 serde:
| |
metrics.reporter.reporter-name.interval | 如果您已經(jīng)注冊了指標(biāo)報(bào)告器 metricss.reporter.*.class = org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory ,您可以使用此屬性來配置記者報(bào)告注冊的指標(biāo)的頻率。此屬性的值應(yīng)為連續(xù)指標(biāo)報(bào)告間隔的長度。該值以秒為單位,應(yīng)為正整數(shù)值。此屬性是可選的,默認(rèn)設(shè)置為60,這意味著每60秒報(bào)告一次。 | |
寫入HDFS | ||
systems.system-name.producer.hdfs.writer.class | org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter | 這個(gè) HDFS Producer 系統(tǒng)應(yīng)該使用的 HdfsWriter 的完全合格的類名 |
systems.system-name. producer.hdfs.compression.type | 沒有 | 用于使用壓縮類型的人類可讀標(biāo)簽,例如“gzip”“snappy”等。根據(jù) HdfsWriter 實(shí)現(xiàn)的性質(zhì),此標(biāo)簽將被不同地(或忽略)解釋。 |
systems.system-name. producer.hdfs.base.output.dir | /user/USERNAME/SYSTEMNAME | HDFS 的基本輸出目錄寫入。默認(rèn)為運(yùn)行作業(yè)的用戶的主目錄,后跟 job.properties 文件中定義的此HdfsSystemProducer 的 systemName。 |
systems.system-name. producer.hdfs.bucketer.class | org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer | 用于管理 HDFS 路徑和文件名的 Bucketer 實(shí)現(xiàn)的完全限定類名稱。用于按時(shí)間批量寫入或其他類似的分區(qū)方法。 |
systems.system-name. producer.hdfs.bucketer.date.path.format | 適用于 HDFS 路徑的日期格式(使用 Java 的SimpleDataFormat 語法),可以配置輸出文件的基于時(shí)間的功能。 | |
systems.system-name. producer.hdfs.write.batch.size.bytes | 268435456 | 在剪切新文件之前,要寫入每個(gè) HDFS 輸出文件的傳出消息的字節(jié)數(shù)。如果沒有設(shè)置,默認(rèn)為 256MB。 |
systems.system-name. producer.hdfs.write.batch.size.records | 262144 | 在剪切新文件之前要寫入每個(gè) HDFS 輸出文件的傳出消息的數(shù)量。如果未設(shè)置,默認(rèn)為 262144。 |
從HDFS閱讀 | ||
systems.system-name.consumer.bufferCapacity | 10 | hdfs 用戶緩沖區(qū)的容量 - 用于存儲消息的阻塞隊(duì)列。較大的緩沖區(qū)容量通常會導(dǎo)致更好的吞吐量,但會消耗更多的內(nèi)存。 |
systems.system-name.consumer.numMaxRetries | 10 | 在容器發(fā)生故障之前從 HDFS 獲取消息失敗時(shí)的重試嘗試次數(shù)。 |
systems.system-name. partitioner.defaultPartitioner.whitelist | * | 目錄分區(qū)器使用的白名單,以 Java Pattern 樣式在 hdfs 目錄中選擇文件。 |
systems.system-name. partitioner.defaultPartitioner.blacklist | 目錄分區(qū)器使用的黑名單,以 Java Pattern 樣式過濾 hdfs 目錄中的不需要的文件。 | |
systems.system-name. partitioner.defaultPartitioner.groupPattern | 目錄分區(qū)用于高級分區(qū)的組模式。高級分區(qū)超出了每個(gè)文件是分區(qū)的基本假設(shè)。使用高級分區(qū),您可以任意地將文件分組到分區(qū)。例如,如果您有一組文件為:[part-01-a.avro,part-01-b.avro,part-02-a.avro,part-02-b.avro,part-03-a。 avro], 你想組織分區(qū):(part-01-a.avro,part-01-b.avro),(part-02-a.avro,part-02-b.avro),(part- 03-a.avro),其中中間的數(shù)字作為“組標(biāo)識符”,您可以將此屬性設(shè)置為 “part- [id] - .*”(注意 “[id]” 是一個(gè)保留的在這里,即你必須把它作為 “[id]”)。分區(qū)器將將此模式應(yīng)用于所有文件名,并提取“組標(biāo)識符”(“[id]” 在模式中),然后使用“組標(biāo)識符”將文件分組到分區(qū)。查看更多詳細(xì)信息 HdfsSystemConsumer設(shè)計(jì)文檔(這是個(gè)下載鏈接) | |
systems.system-name.consumer.reader | avro | 用于不同事件格式的文件讀取器類型(avro,plain,json等)。“avro” 現(xiàn)在只支持類型。 |
systems.system-name.stagingDirectory | 用于存儲分區(qū)描述的分段目錄。默認(rèn)情況下(如果不是由用戶設(shè)置),該值將在內(nèi)部從 “yarn.job.staging.directory” 繼承。默認(rèn)值通常足夠好,除非您明確使用單獨(dú)的位置。 |
更多建議: