Samza 從HDFS文件讀取

2018-08-22 18:22 更新

您可以將 Samza 作業(yè)配置為從 HDFS 文件中讀取。HdfsSystemConsumer 可以從HDFS文件中讀取。Avro 編碼記錄支持開箱即用,易于擴(kuò)展以支持其他格式(純文本,csv,json 等)。參見(jiàn) Event format 下面的部分。

環(huán)境

您的工作需要在托管您要消費(fèi)的 HDFS 的同一個(gè) YARN 群集上運(yùn)行。

分區(qū)

分區(qū)工作在單獨(dú)的 HDFS 文件級(jí)別。每個(gè)文件被視為流分區(qū),而包含這些文件的目錄是流。例如,如果要從包含10個(gè)單獨(dú)文件的 HDFS 路徑中讀取,則自然會(huì)創(chuàng)建10個(gè)分區(qū)。您最多可以配置10個(gè) Samza 容器來(lái)處理這些分區(qū)。如果要從單個(gè) HDFS 文件中讀取,目前無(wú)法分解消耗 - 只能有一個(gè)容器來(lái)處理該文件。

事件格式

HdfsSystemConsumer 目前支持從 avro 文件讀取。收到的 IncomingMessageEnvelope 包含三個(gè)重要的字段:

  1. 空的關(guān)鍵
  2. 設(shè)置為 avro GenericRecord 的消息
  3. 流分區(qū)設(shè)置為 HDFS 文件的名稱

為了將支持?jǐn)U展到 avro 文件(例如 json,csv 等)之外,您可以實(shí)現(xiàn)接口 SingleFileHdfsReader(以一個(gè)示例的形式查看 AvroFileHdfsReader 的實(shí)現(xiàn))。

結(jié)束流支持

HDFS 數(shù)據(jù)和 Kafka 數(shù)據(jù)之間的一個(gè)主要區(qū)別是,當(dāng) kafka 主題具有無(wú)限的消息流時(shí),HDFS 文件是有限的,并且具有 EOF 的概念。

當(dāng)所有分區(qū)都在流結(jié)束時(shí),您可以選擇實(shí)現(xiàn) EndOfStreamListenerTask 來(lái)接收回調(diào)。當(dāng)任務(wù)處理的所有分區(qū)都處于流結(jié)束(即所有文件都已達(dá)到 EOF)時(shí),Samza 作業(yè)將自動(dòng)退出。

基本配置

以下是設(shè)置 HdfsSystemConsumer 的幾個(gè)基本配置:

# The HDFS system consumer is implemented under the org.apache.samza.system.hdfs package,
# so use HdfsSystemFactory as the system factory for your system
systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory

# You need to specify the path of files you want to consume in task.inputs
task.inputs=hdfs-clickstream.hdfs:/data/clickstream/2016/09/11

# You can specify a white list of files you want your job to process (in Java Pattern style)
systems.hdfs-clickstream.partitioner.defaultPartitioner.whitelist=.*avro

# You can specify a black list of files you don't want your job to process (in Java Pattern style),
# by default it's empty.
# Note that you can have both white list and black list, in which case both will be applied.
systems.hdfs-clickstream.partitioner.defaultPartitioner.blacklist=somefile.avro

安全配置

訪問(wèn)啟用了 kerberos的HDFS 群集時(shí),需要以下附加配置:

# Use the SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos delegation tokens when the job is running in a secure environment.
job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory

# Kerberos principal
yarn.kerberos.principal=your-principal-name

# Path of the keytab file (local path)
yarn.kerberos.keytab=/tmp/keytab

高級(jí)配置

您可能需要設(shè)置的一些高級(jí)配置:

# Specify the group pattern for advanced partitioning.
systems.hdfs-clickstream.partitioner.defaultPartitioner.groupPattern=part-[id]-.*

高級(jí)分區(qū)超出了每個(gè)文件是分區(qū)的基本假設(shè)。使用高級(jí)分區(qū),您可以任意地將文件分組到分區(qū)。例如,如果您有一組文件為 [part-01-a.avro,part-01-b.avro,part-02-a.avro,part-02-b.avro,part-03-a.avo] 你想組織成三個(gè)分區(qū)(part-01-a.avro,part-01-b.avro),(part-02-a.avro,part-02-b.avro),(part- 03-a.avro),其中中間的數(shù)字作為“組標(biāo)識(shí)符”,您可以將此屬性設(shè)置為 “part- [id] - “(請(qǐng)注意,* [id] ** 是這里的保留期限,即您必須將其字面意思放在 [id])。分區(qū)器將該模式應(yīng)用于所有文件名,并提取模式中的“組標(biāo)識(shí)符”(“[id]”),然后使用“組標(biāo)識(shí)符”將文件分組到分區(qū)。

# Specify the type of files your job want to process (support avro only for now)
systems.hdfs-clickstream.consumer.reader=avro

# Max number of retries (per-partition) before the container fails.
system.hdfs-clickstream.consumer.numMaxRetries=10

更多信息

HdfsSystemConsumer設(shè)計(jì)文檔

Samza 安全  ?

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)