Samza API概述

2018-08-23 15:13 更新

在為 Samza 編寫(xiě)流處理器時(shí),必須實(shí)現(xiàn) StreamTask 或 AsyncStreamTask 接口。您應(yīng)該為同步進(jìn)程實(shí)現(xiàn) StreamTask,其中消息處理在過(guò)程方法返回后完成。StreamTask 的一個(gè)例子是不涉及遠(yuǎn)程調(diào)用的計(jì)算:

package com.example.samza;

public class MyTaskClass implements StreamTask {

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    // process message
  }
}

另一方面,AsyncSteamTask 接口支持異步處理,其中,在 processAsync 方法返回,所述消息處理可能不是完整的。Java NIO,ParSeq 和 Akka 等各種并行庫(kù)可用于進(jìn)行異步調(diào)用,并通過(guò)調(diào)用 TaskCallback 來(lái)標(biāo)記完成。Samza 將繼續(xù)處理下一個(gè)消息或根據(jù)回調(diào)狀態(tài)關(guān)閉容器。AsyncStreamTask 的一個(gè)示例是進(jìn)行遠(yuǎn)程調(diào)用但不阻塞調(diào)用完成的計(jì)算:

package com.example.samza;

public class MyAsyncTaskClass implements AsyncStreamTask {

  public void processAsync(IncomingMessageEnvelope envelope,
                           MessageCollector collector,
                           TaskCoordinator coordinator,
                           TaskCallback callback) {
    // process message with asynchronous calls
    // fire callback upon completion, e.g. invoking callback from asynchronous call completion thread
  }
}

當(dāng)您運(yùn)行您的工作時(shí),Samza 將創(chuàng)建您的課程的幾個(gè)實(shí)例(可能在多臺(tái)機(jī)器上)。這些任務(wù)實(shí)例處理輸入流中的消息。

在您的工作配置中,您可以告訴 Samza 要消費(fèi)的流。一個(gè)不完整的示例可能如下所示(有關(guān)詳細(xì)信息,請(qǐng)參閱配置文檔):

# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass

# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

對(duì)于 Samza 從任務(wù)輸入流接收到的每個(gè)消息,調(diào)用進(jìn)程方法。該信封包含重要的三件事情:消息、秘鑰以及該消息來(lái)自的流。

/** Every message that is delivered to a StreamTask is wrapped
 * in an IncomingMessageEnvelope, which contains metadata about
 * the origin of the message. */
public class IncomingMessageEnvelope {
  /** A deserialized message. */
  Object getMessage() { ... }

  /** A deserialized key. */
  Object getKey() { ... }

  /** The stream and partition that this message came from. */
  SystemStreamPartition getSystemStreamPartition() { ... }
}

鍵和值被聲明為 Object,并且需要轉(zhuǎn)換為正確的類型。如果不配置序列化器/解串器,它們通常是 Java 字節(jié)數(shù)組。解串器可以將這些字節(jié)轉(zhuǎn)換為任何其他類型,例如上述 JSON 解串器將字節(jié)數(shù)組解析為 java.util.Map,java.util.List 和 String 對(duì)象。

該 getSystemStreamPartition() 方法返回一個(gè) SystemStreamPartition 對(duì)象,該對(duì)象告訴您消息來(lái)自哪里。它由三部分組成:

  1. 系統(tǒng):根據(jù)作業(yè)配置中定義的消息來(lái)自的系統(tǒng)的名稱。您可以有多個(gè)輸入和/或輸出系統(tǒng),每個(gè)系統(tǒng)具有不同的名稱。
  2. 流名稱:源系統(tǒng)中的流(話題,隊(duì)列)的名稱。這也在作業(yè)配置中定義。
  3. 分區(qū):一個(gè)流通常被分成多個(gè)分區(qū),每個(gè)分區(qū)由Samza分配給一個(gè)StreamTask實(shí)例。

API 如下所示:

/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {

  /** The name of the system which provides this stream. It is
      defined in the Samza job's configuration. */
  public String getSystem() { ... }

  /** The name of the stream/topic/queue within the system. */
  public String getStream() { ... }

  /** The partition within the stream. */
  public Partition getPartition() { ... }
}

在上面的示例作業(yè)配置中,系統(tǒng)名稱為 “kafka”,流名稱為 “PageViewEvent”。(名稱 “kafka” 不是特別的 - 您可以給系統(tǒng)任何您想要的名稱。)如果您有多個(gè)輸入流加入到 StreamTask中,您可以使用 SystemStreamPartition 來(lái)確定您收到的消息類型。

如何發(fā)送消息?如果您在 StreamTask 中查看 process()方法,您將看到您收到一個(gè) MessageCollector。

/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
  void send(OutgoingMessageEnvelope envelope);
}

要發(fā)送消息,您將創(chuàng)建一個(gè) OutgoingMessageEnvelope 對(duì)象并將其傳遞給消息收集器。至少,信封指定要發(fā)送的消息,以及將其發(fā)送到的系統(tǒng)和流名稱?;蛘?,您可以指定分區(qū)鍵和其他參數(shù)。

注意:請(qǐng)僅使用方法中的 MessageCollector 對(duì)象 process()。如果您持有 MessageCollector 實(shí)例并稍后再次使用,您的郵件可能無(wú)法正確發(fā)送。

例如,這是一個(gè)簡(jiǎn)單的任務(wù),將每個(gè)輸入消息分成單詞,并將每個(gè)單詞作為單獨(dú)的消息發(fā)出:

public class SplitStringIntoWords implements StreamTask {

  // Send outgoing messages to a stream called "words"
  // in the "kafka" system.
  private final SystemStream OUTPUT_STREAM =
    new SystemStream("kafka", "words");

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    String message = (String) envelope.getMessage();

    for (String word : message.split(" ")) {
      // Use the word as the key, and 1 as the value.
      // A second task can add the 1's to get the word count.
      collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
    }
  }
}

對(duì)于 AsyncStreamTask 示例,請(qǐng)遵循Samza Async API和多線程用戶指南中的教程。有關(guān) API 的更多詳細(xì)信息,請(qǐng)參閱配置

SamzaContainer ?


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)