Samza 流

2018-08-22 17:25 更新

所述samza容器讀取和寫入利用消息 SystemConsumer 和 SystemProducer 接口。您可以通過實現(xiàn)這兩個接口與 Samza 集成任何消息代理。

public interface SystemConsumer {
  void start();

  void stop();

  void register(
      SystemStreamPartition systemStreamPartition,
      String lastReadOffset);

  List<IncomingMessageEnvelope> poll(
      Map<SystemStreamPartition, Integer> systemStreamPartitions,
      long timeout)
    throws InterruptedException;
}

public class IncomingMessageEnvelope {
  public Object getMessage() { ... }

  public Object getKey() { ... }

  public SystemStreamPartition getSystemStreamPartition() { ... }
}

public interface SystemProducer {
  void start();

  void stop();

  void register(String source);

  void send(String source, OutgoingMessageEnvelope envelope);

  void flush(String source);
}

public class OutgoingMessageEnvelope {
  ...
  public Object getKey() { ... }

  public Object getMessage() { ... }
}

開箱即用,Samza 支持 Kafka(KafkaSystemConsumer 和 KafkaSystemProducer)。然而,可以插入任何消息總線系統(tǒng),只要它可以提供 Samza 要求的語義,如javadoc中所述。

SystemConsumers 和 SystemProducer 可以讀取和寫入任何數(shù)據(jù)類型的消息。只要它們只支持字節(jié)數(shù)組就可以了 - Samza 有一個獨立的序列化層,它轉(zhuǎn)換為應(yīng)用程序代碼可以使用的對象。Samza 沒有規(guī)定任何特定的數(shù)據(jù)模型或序列化格式。

作業(yè)配置文件可以包括特定于特定消費者和生產(chǎn)者實現(xiàn)的屬性。例如,配置通常會指示要使用的消息代理的主機名和端口,也可能指示連接選項。

流如何處理

如果作業(yè)正在消耗來自多個輸入流的消息,并且所有輸入流都具有可用的消息,則默認情況下以循環(huán)方式處理消息。例如,如果作業(yè)正在消耗 AdImpressionEvent 和 AdClickEvent,則使用來自 AdImpressionEvent 的消息調(diào)用任務(wù)實例的 process()方法,然后使用來自 AdClickEvent 的消息,然后來自 AdImpressionEvent 的另一個消息,并繼續(xù)在兩者之間交替。

如果其中一個輸入流沒有可用的新消息(最近的消息已經(jīng)被消耗),則該流被跳過,并且該作業(yè)從其他輸入繼續(xù)消耗。它繼續(xù)檢查新消息是否可用。

MessageChooser

當 Samza 容器在不同流分區(qū)上有幾個傳入消息時,它如何決定首先處理哪個?行為由 MessageChooser 確定。默認選擇器是 RoundRobinChooser,但您可以通過實現(xiàn)自定義選擇器來覆蓋它。

要插入自己的郵件選擇器,您需要實現(xiàn) MessageChooserFactory 界面,并將 “task.chooser.class” 配置設(shè)置為實施的完全限定類名稱:

task.chooser.class=com.example.samza.YourMessageChooserFactory

優(yōu)先輸入流

某些時間來自一個流的消息應(yīng)該比來自另一個流的消息優(yōu)先處理。例如,一些 Samza 作業(yè)消耗兩個流:一個流由實時系統(tǒng)饋送,另一個流由批處理系統(tǒng)饋送。在這種情況下,通過批處理流優(yōu)先處理實時流是有用的,以便在批量流中突然發(fā)生數(shù)據(jù)突發(fā)時,實時處理速度不會降低。

Samza 提供了一種通過設(shè)置此配置參數(shù)來將一個流優(yōu)先于另一個流的機制:systems.<system> .streams.<stream> .samza.priority = <number>

例如:

systems.kafka.streams.my-real-time-stream.samza.priority=2
systems.kafka.streams.my-batch-stream.samza.priority=1

這意味著,我的實時流的消息應(yīng)該比我的批處理流的消息優(yōu)先級更高。如果我的實時流有任何消息可用,它們將被先處理。只有當目前沒有消息在等待我的實時流時,Samza 的工作才會繼續(xù)處理我的批量流。

每個優(yōu)先級別都有自己的 MessageChooser。定義具有相同優(yōu)先級的兩個流是有效的。如果消息從兩個流以相同的優(yōu)先級可用,則該優(yōu)先級的 MessageChooser 將決定首先處理哪個消息。

僅定義某些流的優(yōu)先級也是有效的。所有非優(yōu)先流都被視為最低優(yōu)先級,并共享一個 MessageChooser。

引導

有時,在處理來自任何其他流的消息之前,Samza 作業(yè)需要完全消耗流(從偏移0到最近的消息)。在流包含作業(yè)需要的一些必備數(shù)據(jù)的情況下,這是非常有用的,在作業(yè)加載了先決條件數(shù)據(jù)之前處理來自其他流的消息是沒有意義的。Samza 支持這種使用引導流的用例。

引導流似乎與具有高優(yōu)先級的流相似,但是略有不同。在允許任何其他流被處理之前,引導流等待消費者明確地確認流已被完全消耗。在此之前,引導流是作業(yè)的獨占輸入:即使網(wǎng)絡(luò)問題或其他因素導致引導流消費者放慢速度,其他輸入也不能潛入其中。

引導流和高優(yōu)先級流之間的另一個區(qū)別是引導流的特殊處理是臨時的:當它被完全消耗(我們說它已經(jīng)被“捕獲”)時,其優(yōu)先級與所有其他流相同輸入流。

要將名為“my-bootstrap-stream”的流配置為完全消耗的引導流,請使用以下設(shè)置:

systems.kafka.streams.my-bootstrap-stream.samza.bootstrap=true
systems.kafka.streams.my-bootstrap-stream.samza.reset.offset=true
systems.kafka.streams.my-bootstrap-stream.samza.offset.default=oldest

bootstrap = true 參數(shù)啟用引導行為(優(yōu)先于其他流)。reset.offset = true和offset.default = oldest 的組合告訴 Samza 始終從最早的偏移量開始讀取流,每次容器啟動(而不是從最近的檢查點開始讀?。?。

定義多個引導流是有效的。在這種情況下,它們被引導的順序由優(yōu)先級確定。

配料

在某些情況下,您可以按順序從相同的流分區(qū)中消耗多個消息來提高性能。Samza 支持這種操作模式,稱為批處理。

例如,如果要從每個流分區(qū)(不管 MessageChooser)讀取一行中的 100 條消息,可以使用此配置參數(shù):

task.consumer.batch.size=100

使用此設(shè)置,Samza 嘗試從最近使用的 SystemStreamPartition中 讀取消息。直到?jīng)]有更多消息可用于該 SystemStreamPartition,或者直到達到批量大小,此行為將繼續(xù)。當這種情況發(fā)生時,Samza 將前往 MessageChooser 來確定要處理的下一個消息。然后再次嘗試從所選消息的 SystemStreamPartition 繼續(xù)消費,直到達到批量大小。

序列化  ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號