在為 Samza 編寫流處理器時,必須實現(xiàn) StreamTask 或 AsyncStreamTask 接口。您應(yīng)該為同步進(jìn)程實現(xiàn) StreamTask,其中消息處理在過程方法返回后完成。StreamTask 的一個例子是不涉及遠(yuǎn)程調(diào)用的計算:
package com.example.samza;
public class MyTaskClass implements StreamTask {
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
// process message
}
}
另一方面,AsyncSteamTask 接口支持異步處理,其中,在 processAsync 方法返回,所述消息處理可能不是完整的。Java NIO,ParSeq 和 Akka 等各種并行庫可用于進(jìn)行異步調(diào)用,并通過調(diào)用 TaskCallback 來標(biāo)記完成。Samza 將繼續(xù)處理下一個消息或根據(jù)回調(diào)狀態(tài)關(guān)閉容器。AsyncStreamTask 的一個示例是進(jìn)行遠(yuǎn)程調(diào)用但不阻塞調(diào)用完成的計算:
package com.example.samza;
public class MyAsyncTaskClass implements AsyncStreamTask {
public void processAsync(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator,
TaskCallback callback) {
// process message with asynchronous calls
// fire callback upon completion, e.g. invoking callback from asynchronous call completion thread
}
}
當(dāng)您運行您的工作時,Samza 將創(chuàng)建您的課程的幾個實例(可能在多臺機器上)。這些任務(wù)實例處理輸入流中的消息。
在您的工作配置中,您可以告訴 Samza 要消費的流。一個不完整的示例可能如下所示(有關(guān)詳細(xì)信息,請參閱配置文檔):
# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass
# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent
# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json
對于 Samza 從任務(wù)輸入流接收到的每個消息,調(diào)用進(jìn)程方法。該信封包含重要的三件事情:消息、秘鑰以及該消息來自的流。
/** Every message that is delivered to a StreamTask is wrapped
* in an IncomingMessageEnvelope, which contains metadata about
* the origin of the message. */
public class IncomingMessageEnvelope {
/** A deserialized message. */
Object getMessage() { ... }
/** A deserialized key. */
Object getKey() { ... }
/** The stream and partition that this message came from. */
SystemStreamPartition getSystemStreamPartition() { ... }
}
鍵和值被聲明為 Object,并且需要轉(zhuǎn)換為正確的類型。如果不配置序列化器/解串器,它們通常是 Java 字節(jié)數(shù)組。解串器可以將這些字節(jié)轉(zhuǎn)換為任何其他類型,例如上述 JSON 解串器將字節(jié)數(shù)組解析為 java.util.Map,java.util.List 和 String 對象。
該 getSystemStreamPartition() 方法返回一個 SystemStreamPartition 對象,該對象告訴您消息來自哪里。它由三部分組成:
API 如下所示:
/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {
/** The name of the system which provides this stream. It is
defined in the Samza job's configuration. */
public String getSystem() { ... }
/** The name of the stream/topic/queue within the system. */
public String getStream() { ... }
/** The partition within the stream. */
public Partition getPartition() { ... }
}
在上面的示例作業(yè)配置中,系統(tǒng)名稱為 “kafka”,流名稱為 “PageViewEvent”。(名稱 “kafka” 不是特別的 - 您可以給系統(tǒng)任何您想要的名稱。)如果您有多個輸入流加入到 StreamTask中,您可以使用 SystemStreamPartition 來確定您收到的消息類型。
如何發(fā)送消息?如果您在 StreamTask 中查看 process()方法,您將看到您收到一個 MessageCollector。
/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
void send(OutgoingMessageEnvelope envelope);
}
要發(fā)送消息,您將創(chuàng)建一個 OutgoingMessageEnvelope 對象并將其傳遞給消息收集器。至少,信封指定要發(fā)送的消息,以及將其發(fā)送到的系統(tǒng)和流名稱?;蛘撸梢灾付ǚ謪^(qū)鍵和其他參數(shù)。
注意:請僅使用方法中的 MessageCollector 對象 process()。如果您持有 MessageCollector 實例并稍后再次使用,您的郵件可能無法正確發(fā)送。
例如,這是一個簡單的任務(wù),將每個輸入消息分成單詞,并將每個單詞作為單獨的消息發(fā)出:
public class SplitStringIntoWords implements StreamTask {
// Send outgoing messages to a stream called "words"
// in the "kafka" system.
private final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "words");
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
String message = (String) envelope.getMessage();
for (String word : message.split(" ")) {
// Use the word as the key, and 1 as the value.
// A second task can add the 1's to get the word count.
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
}
}
}
對于 AsyncStreamTask 示例,請遵循Samza Async API和多線程用戶指南中的教程。有關(guān) API 的更多詳細(xì)信息,請參閱配置。
更多建議: