Samza 序列化

2018-08-22 17:27 更新

從數(shù)據(jù)持久狀態(tài)存儲器讀取或?qū)懭氲拿總€消息都需要最終序列化為字節(jié)(通過網(wǎng)絡(luò)發(fā)送或?qū)懭氪疟P)。有不同的地方可以發(fā)生序列化和反序列化:

  1. 在客戶端庫中:例如,發(fā)布到 Kafka 并從 Kafka 消費(fèi)的庫支持可插拔序列化。
  2. 在任務(wù)實現(xiàn)中:您的進(jìn)程方法可以使用原始字節(jié)數(shù)組作為輸入和輸出,并進(jìn)行任何解析和序列化本身。
  3. 兩者之間:Samza 提供串行解串器,或構(gòu)成的層 SERDES 的簡稱。

你可以使用任何有意義的工作;Samza 不會對您施加任何特定的數(shù)據(jù)模型或序列化方案。然而,最干凈的解決方案通常是使用 Samza 的 serde 層。以下配置示例顯示如何使用它。

# Define a system called "kafka"
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job is going to consume a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serde called "json" which parses/serializes JSON objects
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Define a serde called "integer" which encodes an integer as 4 binary bytes (big-endian)
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory

# For messages in the "PageViewEvent" topic, the key (the ID of the user viewing the page)
# is encoded as a binary integer, and the message is encoded as JSON.
systems.kafka.streams.PageViewEvent.samza.key.serde=integer
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

# Define a key-value store which stores the most recent page view for each user ID.
# Again, the key is an integer user ID, and the value is JSON.
stores.LastPageViewPerUser.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
stores.LastPageViewPerUser.changelog=kafka.last-page-view-per-user
stores.LastPageViewPerUser.key.serde=integer
stores.LastPageViewPerUser.msg.serde=json

每個 serde 都用工廠類定義。Samza 帶有幾個內(nèi)置的 serdes,用于UTF-8字符串,二進(jìn)制編碼整數(shù),JSON 等。以下是 Samza 支持的 serdes 的完整列表。

Serde 名字數(shù)據(jù)處理
UTF-8字符串
整數(shù)二進(jìn)制編碼整數(shù)
序列化可序列化對象類型
長數(shù)據(jù)類型
JSONJSON格式的數(shù)據(jù)
字節(jié)純字節(jié)(無效) - 適用于二進(jìn)制消息
字節(jié)緩沖區(qū)字節(jié)緩沖區(qū)

您還可以通過實現(xiàn)SerdeFactory界面來創(chuàng)建自己的串行器。

你給一個 serde 的名字(例如上面的例子中的 “json” 和 “integer”)只是為了方便你的作業(yè)配置; 你可以選擇任何你喜歡的名字。對于每個流和每個狀態(tài)存儲,您可以使用 serde 名稱聲明消息應(yīng)如何序列化和反序列化。

如果您不聲明 serde,Samza 只需將對象傳遞到任務(wù)實例和系統(tǒng)流之間。在這種情況下,您的任務(wù)需要發(fā)送和接收底層客戶端庫使用的任何類型的對象。

用于發(fā)送和接收消息的所有 Samza API 都鍵入 Object。這意味著您必須將消息轉(zhuǎn)換為正確的類型才能使用它們。這是一個更多的代碼,但它的優(yōu)勢是 Samza 不限于任何特定的數(shù)據(jù)模型。

檢查點(diǎn) ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號