所述samza容器讀取和寫入利用消息 SystemConsumer 和 SystemProducer 接口。您可以通過(guò)實(shí)現(xiàn)這兩個(gè)接口與 Samza 集成任何消息代理。
public interface SystemConsumer {
void start();
void stop();
void register(
SystemStreamPartition systemStreamPartition,
String lastReadOffset);
List<IncomingMessageEnvelope> poll(
Map<SystemStreamPartition, Integer> systemStreamPartitions,
long timeout)
throws InterruptedException;
}
public class IncomingMessageEnvelope {
public Object getMessage() { ... }
public Object getKey() { ... }
public SystemStreamPartition getSystemStreamPartition() { ... }
}
public interface SystemProducer {
void start();
void stop();
void register(String source);
void send(String source, OutgoingMessageEnvelope envelope);
void flush(String source);
}
public class OutgoingMessageEnvelope {
...
public Object getKey() { ... }
public Object getMessage() { ... }
}
開(kāi)箱即用,Samza 支持 Kafka(KafkaSystemConsumer 和 KafkaSystemProducer)。然而,可以插入任何消息總線系統(tǒng),只要它可以提供 Samza 要求的語(yǔ)義,如javadoc中所述。
SystemConsumers 和 SystemProducer 可以讀取和寫入任何數(shù)據(jù)類型的消息。只要它們只支持字節(jié)數(shù)組就可以了 - Samza 有一個(gè)獨(dú)立的序列化層,它轉(zhuǎn)換為應(yīng)用程序代碼可以使用的對(duì)象。Samza 沒(méi)有規(guī)定任何特定的數(shù)據(jù)模型或序列化格式。
作業(yè)配置文件可以包括特定于特定消費(fèi)者和生產(chǎn)者實(shí)現(xiàn)的屬性。例如,配置通常會(huì)指示要使用的消息代理的主機(jī)名和端口,也可能指示連接選項(xiàng)。
如果作業(yè)正在消耗來(lái)自多個(gè)輸入流的消息,并且所有輸入流都具有可用的消息,則默認(rèn)情況下以循環(huán)方式處理消息。例如,如果作業(yè)正在消耗 AdImpressionEvent 和 AdClickEvent,則使用來(lái)自 AdImpressionEvent 的消息調(diào)用任務(wù)實(shí)例的 process()方法,然后使用來(lái)自 AdClickEvent 的消息,然后來(lái)自 AdImpressionEvent 的另一個(gè)消息,并繼續(xù)在兩者之間交替。
如果其中一個(gè)輸入流沒(méi)有可用的新消息(最近的消息已經(jīng)被消耗),則該流被跳過(guò),并且該作業(yè)從其他輸入繼續(xù)消耗。它繼續(xù)檢查新消息是否可用。
當(dāng) Samza 容器在不同流分區(qū)上有幾個(gè)傳入消息時(shí),它如何決定首先處理哪個(gè)?行為由 MessageChooser 確定。默認(rèn)選擇器是 RoundRobinChooser,但您可以通過(guò)實(shí)現(xiàn)自定義選擇器來(lái)覆蓋它。
要插入自己的郵件選擇器,您需要實(shí)現(xiàn) MessageChooserFactory 界面,并將 “task.chooser.class” 配置設(shè)置為實(shí)施的完全限定類名稱:
task.chooser.class=com.example.samza.YourMessageChooserFactory
某些時(shí)間來(lái)自一個(gè)流的消息應(yīng)該比來(lái)自另一個(gè)流的消息優(yōu)先處理。例如,一些 Samza 作業(yè)消耗兩個(gè)流:一個(gè)流由實(shí)時(shí)系統(tǒng)饋送,另一個(gè)流由批處理系統(tǒng)饋送。在這種情況下,通過(guò)批處理流優(yōu)先處理實(shí)時(shí)流是有用的,以便在批量流中突然發(fā)生數(shù)據(jù)突發(fā)時(shí),實(shí)時(shí)處理速度不會(huì)降低。
Samza 提供了一種通過(guò)設(shè)置此配置參數(shù)來(lái)將一個(gè)流優(yōu)先于另一個(gè)流的機(jī)制:systems.<system> .streams.<stream> .samza.priority = <number>
例如:
systems.kafka.streams.my-real-time-stream.samza.priority=2
systems.kafka.streams.my-batch-stream.samza.priority=1
這意味著,我的實(shí)時(shí)流的消息應(yīng)該比我的批處理流的消息優(yōu)先級(jí)更高。如果我的實(shí)時(shí)流有任何消息可用,它們將被先處理。只有當(dāng)目前沒(méi)有消息在等待我的實(shí)時(shí)流時(shí),Samza 的工作才會(huì)繼續(xù)處理我的批量流。
每個(gè)優(yōu)先級(jí)別都有自己的 MessageChooser。定義具有相同優(yōu)先級(jí)的兩個(gè)流是有效的。如果消息從兩個(gè)流以相同的優(yōu)先級(jí)可用,則該優(yōu)先級(jí)的 MessageChooser 將決定首先處理哪個(gè)消息。
僅定義某些流的優(yōu)先級(jí)也是有效的。所有非優(yōu)先流都被視為最低優(yōu)先級(jí),并共享一個(gè) MessageChooser。
有時(shí),在處理來(lái)自任何其他流的消息之前,Samza 作業(yè)需要完全消耗流(從偏移0到最近的消息)。在流包含作業(yè)需要的一些必備數(shù)據(jù)的情況下,這是非常有用的,在作業(yè)加載了先決條件數(shù)據(jù)之前處理來(lái)自其他流的消息是沒(méi)有意義的。Samza 支持這種使用引導(dǎo)流的用例。
引導(dǎo)流似乎與具有高優(yōu)先級(jí)的流相似,但是略有不同。在允許任何其他流被處理之前,引導(dǎo)流等待消費(fèi)者明確地確認(rèn)流已被完全消耗。在此之前,引導(dǎo)流是作業(yè)的獨(dú)占輸入:即使網(wǎng)絡(luò)問(wèn)題或其他因素導(dǎo)致引導(dǎo)流消費(fèi)者放慢速度,其他輸入也不能潛入其中。
引導(dǎo)流和高優(yōu)先級(jí)流之間的另一個(gè)區(qū)別是引導(dǎo)流的特殊處理是臨時(shí)的:當(dāng)它被完全消耗(我們說(shuō)它已經(jīng)被“捕獲”)時(shí),其優(yōu)先級(jí)與所有其他流相同輸入流。
要將名為“my-bootstrap-stream”的流配置為完全消耗的引導(dǎo)流,請(qǐng)使用以下設(shè)置:
systems.kafka.streams.my-bootstrap-stream.samza.bootstrap=true
systems.kafka.streams.my-bootstrap-stream.samza.reset.offset=true
systems.kafka.streams.my-bootstrap-stream.samza.offset.default=oldest
bootstrap = true 參數(shù)啟用引導(dǎo)行為(優(yōu)先于其他流)。reset.offset = true和offset.default = oldest 的組合告訴 Samza 始終從最早的偏移量開(kāi)始讀取流,每次容器啟動(dòng)(而不是從最近的檢查點(diǎn)開(kāi)始讀?。?。
定義多個(gè)引導(dǎo)流是有效的。在這種情況下,它們被引導(dǎo)的順序由優(yōu)先級(jí)確定。
在某些情況下,您可以按順序從相同的流分區(qū)中消耗多個(gè)消息來(lái)提高性能。Samza 支持這種操作模式,稱為批處理。
例如,如果要從每個(gè)流分區(qū)(不管 MessageChooser)讀取一行中的 100 條消息,可以使用此配置參數(shù):
task.consumer.batch.size=100
使用此設(shè)置,Samza 嘗試從最近使用的 SystemStreamPartition中 讀取消息。直到?jīng)]有更多消息可用于該 SystemStreamPartition,或者直到達(dá)到批量大小,此行為將繼續(xù)。當(dāng)這種情況發(fā)生時(shí),Samza 將前往 MessageChooser 來(lái)確定要處理的下一個(gè)消息。然后再次嘗試從所選消息的 SystemStreamPartition 繼續(xù)消費(fèi),直到達(dá)到批量大小。
更多建議: