Samza 0.13.0 引入了新的編程模型和新的部署模型,它們作為預(yù)覽被發(fā)布,因?yàn)樗鼈兇砹碎_(kāi)發(fā)人員如何與Samza合作的重大改進(jìn),因此對(duì)早期采用者和Samza開(kāi)發(fā)社區(qū)來(lái)說(shuō),有助于實(shí)驗(yàn)該版本并提供反饋。以下內(nèi)容介紹新功能和鏈接到教程,演示如何使用它們。請(qǐng)嘗試并發(fā)送反饋給開(kāi)發(fā)者郵件列表。
想跳過(guò)所有細(xì)節(jié)并獲得一些實(shí)際經(jīng)驗(yàn)嗎?有三個(gè)教程可以幫助您了解在 YARN 和嵌入式模式下運(yùn)行 Samza 應(yīng)用程序以及使用高級(jí) API 進(jìn)行編程:
Samza 高級(jí) API 提供統(tǒng)一的方式來(lái)處理流和批量數(shù)據(jù)。您可以在單個(gè)程序中使用地圖,過(guò)濾器,窗口和連接等操作員來(lái)描述端到端應(yīng)用程序邏輯,以完成以前所需的多個(gè)作業(yè)。API 旨在便攜式。相同的應(yīng)用程序代碼可以批處理或流式傳輸模式,嵌入式或集群管理器環(huán)境部署,并可以通過(guò)簡(jiǎn)單的配置更改在Kafka,Kinesis,HDFS或其他系統(tǒng)之間切換。這種可移植性是由以下部分所述的新架構(gòu)啟用的。
Samza 的架構(gòu)已經(jīng)被大修,具有不同的層次,可以處理應(yīng)用程序開(kāi)發(fā)的每一個(gè)階段。下圖顯示了 Apache Samza 架構(gòu)與高級(jí) API 的概述。
建筑中有四層,以下部分描述每個(gè)圖層。
高級(jí) API 提供庫(kù)來(lái)定義應(yīng)用程序邏輯。該StreamApplication是您的應(yīng)用程序必須貫徹執(zhí)行中央的抽象。您首先將輸入聲明為MessageStream的實(shí)例。然后,您可以在每個(gè) MessageStream 上應(yīng)用運(yùn)算符,如地圖,過(guò)濾器,窗口和連接,以在單個(gè)程序中定義整個(gè)端到端數(shù)據(jù)處理。
要深入了解高級(jí) API,請(qǐng)參閱下面的高級(jí)API部分。
Samza 使用ApplicationRunner來(lái)運(yùn)行流應(yīng)用程序。ApplicationRunner 生成配置(如輸入/輸出流),創(chuàng)建中間流,并開(kāi)始執(zhí)行。ApplicationRunner 有兩種類型:
第一種:RemoteApplicationRunner - 將應(yīng)用程序提交到遠(yuǎn)程集群。該運(yùn)行程序通過(guò) run-app.sh 腳本調(diào)用。
要使用 RemoteApplicationRunner,請(qǐng)?jiān)O(shè)置以下配置:
# The StreamApplication class to run
app.class=com.company.job.YourStreamApplication
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
然后使用 run-app.sh 在遠(yuǎn)程集群中運(yùn)行應(yīng)用程序。該腳本將調(diào)用 RemoteApplicationRunner,它將使用 job.factory.class 指定的工廠啟動(dòng)一個(gè)或多個(gè)作業(yè)。
第二種:LocalApplicationRunner - 在JVM進(jìn)程中運(yùn)行該應(yīng)用程序。例如,要使用 ZooKeeper 在多臺(tái)機(jī)器上啟動(dòng)應(yīng)用程序進(jìn)行協(xié)調(diào),可以在各種機(jī)器上運(yùn)行多個(gè) LocalApplicationRunner實(shí)例。應(yīng)用程序加載后,他們將通過(guò) ZooKeeper 啟動(dòng)它們的操作。以下是使用 LocalApplicationRunner 在程序中運(yùn)行 StreamApplication 的示例:
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
StreamApplication app = new YourStreamApplication();
localRunner.run(app);
// Wait for the application to finish
localRunner.waitForFinish();
System.out.println("Application completed with status " + localRunner.status(app));
}
按照ZooKeeper部署教程進(jìn)行嘗試。
ApplicationRunner 在開(kāi)始執(zhí)行處理邏輯之前生成一個(gè)物理執(zhí)行計(jì)劃。該計(jì)劃表示應(yīng)用程序的運(yùn)行時(shí)結(jié)構(gòu)。特別地,它提供對(duì)生成的中間流的可見(jiàn)性。一旦部署了工作,該計(jì)劃可以被看作如下:
要查看計(jì)劃,請(qǐng)?jiān)跒g覽器中打開(kāi) bin / plan.html 文件。這是一個(gè)示例計(jì)劃可視化:
Samza 支持兩種類型的執(zhí)行模型:基于群集的執(zhí)行和嵌入式執(zhí)行。
在基于群集的執(zhí)行中,Samza 將在多租戶群集上運(yùn)行和管理您的應(yīng)用程序。薩姆支持 YARN。您可以實(shí)現(xiàn)自己的 StreamJob 和相應(yīng)的 ResourceManagerFactory 來(lái)添加對(duì)另一個(gè)集群管理器的支持。
在嵌入式執(zhí)行模型中,您可以在應(yīng)用程序中使用 Sa mza 作為輕量級(jí)庫(kù)。您可以旋轉(zhuǎn)應(yīng)用程序的多個(gè)實(shí)例,它們之間將分配和協(xié)調(diào)處理。此模式提供了在任意托管環(huán)境中運(yùn)行應(yīng)用程序的靈活性:它還支持可插拔協(xié)調(diào)邏輯,支持兩種類型的協(xié)調(diào)開(kāi)箱即用:
有關(guān)在嵌入式模式下運(yùn)行 Samza 的更多詳細(xì)信息,請(qǐng)參閱下面的靈活部署模型部分。
Samza 應(yīng)用程序的最低執(zhí)行單元是處理器。它讀取從ApplicationRunner生成的配置并處理 JobCoordinator 分配的輸入流分區(qū)。它可以使用KeyValueStore實(shí)現(xiàn)(例如 RocksDB 或內(nèi)存)和使用多線程的遠(yuǎn)程狀態(tài)(例如 REST 服務(wù))訪問(wèn)本地狀態(tài)。
自從0.13.0版本以來(lái),Samza 提供了一個(gè)新的高級(jí) API,可以簡(jiǎn)化應(yīng)用程序。此 API 支持重新分區(qū),加窗和加入流等操作。您現(xiàn)在可以在幾行代碼中簡(jiǎn)潔地表達(dá)您的應(yīng)用程序邏輯,并完成以前需要的多個(gè)作業(yè)。
查看一些示例來(lái)查看高級(jí) API 的操作。
在使用 Samza 高級(jí) API 編寫(xiě)流處理應(yīng)用程序時(shí),應(yīng)實(shí)現(xiàn)StreamApplication并在 init 方法中定義處理邏輯。
public void init(StreamGraph graph, Config config) { … }
例如,這里是一個(gè) StreamApplication,它使用查看者的個(gè)人資料信息來(lái)驗(yàn)證和裝飾頁(yè)面瀏覽。
public class BadPageViewFilter implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
MessageStream<PageView> pageViews = graph.getInputStream(“page-views”..);
pageViews.filter(this::isValidPageView)
.map(this::addProfileInformation)
.sendTo(graph.getOutputStream(“decorated-page-views”..))
}
}
顧名思義,MessageStream 表示消息流。StreamApplication 被描述為 MessageStreams 上的一系列轉(zhuǎn)換。
您可以通過(guò)兩種方式獲取 MessageStream:
使用 Samza 高級(jí) API 編寫(xiě)流處理應(yīng)用程序有3個(gè)簡(jiǎn)單的步驟。
您可以使用 StreamGraph.getInputStream 獲取輸入流 ID(“page-views”)的 MessageStream。
MessageStream<PageView> pageViewInput = graph.getInputStream(“page-views”, (k,v) -> v);
第一個(gè)參數(shù) page-views 是邏輯流 ID。每個(gè)流 ID 與物理名稱和系統(tǒng)相關(guān)聯(lián)。默認(rèn)情況下,Samza 使用流 ID 作為物理流名稱,并訪問(wèn)使用屬性 “job.default.system” 指定的默認(rèn)系統(tǒng)上的流。但是,物理名稱和系統(tǒng)屬性可以在配置中被覆蓋。例如,以下配置將流 ID“page-views” 定義為本地 Kafka 集群中 PageViewEvent 主題的別名。
streams.page-views.samza.system=kafka
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
streams.page-views.samza.physical.name=PageViewEvent
第二個(gè)參數(shù) (k,v) -> v 是 MessageBuilder 函數(shù),用于從傳入的鍵和值構(gòu)造消息。
您現(xiàn)在可以將 StreamApplication 邏輯定義為 MessageStream 上的一系列轉(zhuǎn)換。
MessageStream<DecoratedPageViews> decoratedPageViews
= pageViewInput.filter(this::isValidPageView)
.map(this::addProfileInformation);
最后,您可以使用 StreamGraph.getOutputStream 創(chuàng)建一個(gè) OutputStream,并通過(guò)它發(fā)送已轉(zhuǎn)換的消息。
// Send messages with userId as the key to “decorated-page-views”.
decoratedPageViews.sendTo(
graph.getOutputStream(“decorated-page-views”,
dpv -> dpv.getUserId(),
dpv -> dpv));
第一個(gè)參數(shù) decorated-page-views 是邏輯流 ID。該流 ID 的屬性可以像輸入流的流 ID 一樣被覆蓋。例如:
streams.decorated-page-views.samza.system=kafka
streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
第二和第三個(gè)參數(shù)定義提取器,將上游數(shù)據(jù)類型分別分成一個(gè)單獨(dú)的鍵和值。
高級(jí) API 支持常用的運(yùn)算符,如地圖,平面圖,過(guò)濾器,合并,連接和窗口。大多數(shù)這些操作符接受相應(yīng)的函數(shù),這些函數(shù)是 Initable。
將提供的1:1 MapFunction 應(yīng)用于 MessageStream 中的每個(gè)元素,并返回已轉(zhuǎn)換的 MessageStream。MapFunction 接收單個(gè)消息并返回單個(gè)消息(可能具有不同類型的消息)。
MessageStream<Integer> numbers = ...
MessageStream<Integer> tripled= numbers.map(m -> m * 3)
MessageStream<String> stringified = numbers.map(m -> String.valueOf(m))
將提供的 1:n FlatMapFunction應(yīng)用于 MessageStream 中的每個(gè)元素,并返回已轉(zhuǎn)換的 MessageStream。FlatMapFunction 接收單個(gè)消息并返回零個(gè)或多個(gè)消息。
MessageStream<String> sentence = ...
// Parse the sentence into its individual words splitting by space
MessageStream<String> words = sentence.flatMap(sentence ->
Arrays.asList(sentence.split(“ ”))
將提供的FilterFunction應(yīng)用于 MessageStream 并返回已過(guò)濾的 MessageStream。FilterFunction 是一個(gè)謂詞,用于指定是否應(yīng)將消息保留在過(guò)濾的流中。FilterFunction 返回 false 的消息將被過(guò)濾掉。
MessageStream<String> words = ...
// Extract only the long words
MessageStream<String> longWords = words.filter(word -> word.size() > 15);
// Extract only the short words
MessageStream<String> shortWords = words.filter(word -> word.size() < 3);
使用提供的 keyExtractor 返回的鍵重新分區(qū)該 MessageStream,并返回已轉(zhuǎn)換的 MessageStream。在重新分區(qū)期間通過(guò)中間流發(fā)送消息。
// Repartition pageView by userId
MessageStream<PageView> pageViews = ...
MessageStream<PageView> partitionedPageViews =
pageViews.partitionBy(pageView -> pageView.getUserId())
將 MessageStream 與所有提供的 MessageStream 合并,并返回合并的流。
MessageStream<ServiceCall> serviceCall1 = ...
MessageStream<ServiceCall> serviceCall2 = ...
// Merge individual “ServiceCall” streams and create a new merged MessageStream
MessageStream<ServiceCall> serviceCallMerged = serviceCall1.merge(serviceCall2)
合并變換保留每個(gè) MessageStream 的順序,因此如果消息 m1 出現(xiàn) m2 在任何提供的流之前,則 m1 也會(huì)出現(xiàn) m2 在合并流之前。
作為 merge 實(shí)例方法的替代方法,您還可以使用MessageStream#mergeAll靜態(tài)方法來(lái)合并 MessageStream 而不在初始流上操作。
將此消息流中的所有消息發(fā)送到提供的 OutputStream。您可以指定要用于傳出消息的密鑰和值。
// Output a new message with userId as the key and region as the value to the “user-region” stream.
MessageStream<PageView> pageViews = ...
OutputStream<String, String, PageView> userRegions
= graph.getOutputStream(“user-region”,
pageView -> pageView.getUserId(),
pageView -> pageView.getRegion())
pageView.sendTo(userRegions);
允許使用提供的SinkFunction從此 MessageStream 發(fā)送消息到輸出系統(tǒng)。
這提供了比 sendTo 更多的控制,因?yàn)?SinkFunction 可以訪問(wèn) MessageCollector 和 TaskCoordinator。例如,您可以選擇手動(dòng)提交偏移量,或使用 TaskCoordinator API 關(guān)閉作業(yè)。該運(yùn)營(yíng)商也可用于向非薩姆薩系統(tǒng)發(fā)送消息(如遠(yuǎn)程數(shù)據(jù)庫(kù),REST 服務(wù)等)
// Repartition pageView by userId.
MessageStream<PageView> pageViews = ...
pageViews.sink( (msg, collector, coordinator) -> {
// Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.
collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”,
“TransformedPageViewEvent”), msg));
} )
Join 運(yùn)算符使用提供的成對(duì)JoinFunction從兩個(gè) MessageStream 中加入消息。當(dāng)從第一流的消息提取的密鑰匹配從第二個(gè)流中的消息提取的密鑰時(shí),消息被連接。每個(gè)流中的消息將保留提供的 ttl 持續(xù)時(shí)間,并且連接結(jié)果將在匹配發(fā)現(xiàn)時(shí)發(fā)出。
// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.
// Results are produced to a new stream of FulfilledOrderRecord.
MessageStream<OrderRecord> orders = …
MessageStream<ShipmentRecord> shipments = …
MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(), Duration.ofMinutes(20) )
// Constructs a new FulfilledOrderRecord by extracting the order timestamp from the OrderRecord and the shipment timestamp from the ShipmentRecord.
class OrderShipmentJoiner implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
@Override
public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
return new FulfilledOrderRecord(message.orderId, message.orderTimestamp, otherMessage.shipTimestamp);
}
@Override
public String getFirstKey(OrderRecord message) {
return message.orderId;
}
@Override
public String getSecondKey(ShipmentRecord message) {
return message.orderId;
}
}
Windows,觸發(fā)器和 WindowPanes:窗口操作符將 MessageStream 中的傳入消息分組為有限的窗口。每個(gè)發(fā)出的結(jié)果在窗口中包含一個(gè)或多個(gè)消息,稱為 WindowPane。
窗口可以具有一個(gè)或多個(gè)關(guān)聯(lián)的觸發(fā)器,以確定何時(shí)發(fā)出窗口的結(jié)果。觸發(fā)器可以是早期觸發(fā)器,允許在窗口的所有數(shù)據(jù)到達(dá)之前推測(cè)出結(jié)果,或者延遲觸發(fā)器允許處理窗口的延遲消息。
聚合器功能:默認(rèn)情況下,發(fā)出的 WindowPane 將包含窗口的所有消息。您通常不會(huì)保留所有消息,而是為 WindowPane 定義一個(gè)更緊湊的數(shù)據(jù)結(jié)構(gòu),并在新消息到達(dá)時(shí)逐漸更新,例如在窗口中保留消息計(jì)數(shù)。為此,您可以提供一個(gè)聚合的FoldLeftFunction,它為每個(gè)添加到窗口的傳入消息調(diào)用,并定義如何更新該消息的 WindowPane。
累積模式:窗口的累加模式確定從窗口發(fā)出的結(jié)果與同一窗口的先前發(fā)射結(jié)果相關(guān)。當(dāng)窗口配置有早期或晚期觸發(fā)器時(shí),這特別有用。累積模式可以是丟棄或累積。
一個(gè)丟棄窗口清除在每一個(gè)發(fā)射窗口的所有狀態(tài)。每次發(fā)射只會(huì)對(duì)應(yīng)于從前一次發(fā)射窗口到達(dá)的新消息。
一個(gè)累加窗口保留從以前的排放窗口的結(jié)果。每個(gè)排放將包含從窗口開(kāi)始到達(dá)的所有郵件。
Samza 高級(jí) API 目前支持翻滾和會(huì)話窗口。
翻滾窗口:一個(gè)翻滾窗口定義了一系列的流中連續(xù)的,固定大小的時(shí)間間隔。
例子:
// Group the pageView stream into 3 second tumbling windows keyed by the userId.
MessageStream<PageView> pageViews = ...
MessageStream<WindowPane<String, Collection<PageView>>> =
pageViews.window(
Windows.keyedTumblingWindow(pageView -> pageView.getUserId(),
Duration.ofSeconds(30)))
// Compute the maximum value over tumbling windows of 3 seconds.
MessageStream<Integer> integers = …
Supplier<Integer> initialValue = () -> Integer.MIN_VALUE
FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
MessageStream<WindowPane<Void, Integer>> windowedStream =
integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction))
會(huì)話窗口:會(huì)話窗口將 MessageStream 組合成會(huì)話。會(huì)話通過(guò) MessageStream 捕獲一段活動(dòng),并由間隙定義。關(guān)閉會(huì)話,如果沒(méi)有新消息到達(dá)窗口以獲得間隙時(shí)間,則會(huì)發(fā)出結(jié)果。
例子:
// Sessionize a stream of page views, and count the number of page-views in a session for every user.
MessageStream<PageView> pageViews = …
Supplier<Integer> initialValue = () -> 0
FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> oldCount + 1;
Duration sessionGap = Duration.ofMinutes(3);
MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(Windows.keyedSessionWindow(
pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator));
// Compute the maximum value over tumbling windows of 3 seconds.
MessageStream<Integer> integers = …
Supplier<Integer> initialValue = () -> Integer.MAX_INT
FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
MessageStream<WindowPane<Void, Integer>> windowedStream =
integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction))
目前,窗口和連接操作符緩沖內(nèi)存中的消息。因此,消息可能在故障和重新啟動(dòng)時(shí)丟失。
在 Samza 0.13.0 之前,Samza僅支持使用 YARN進(jìn)行 集群管理的部署。
使用 Samza 0.13.0,部署模式已被簡(jiǎn)化并與 YARN 分離。如果您喜歡集群管理,您仍然可以使用 YARN,或者您可以實(shí)現(xiàn)自己的擴(kuò)展,以在其他集群管理系統(tǒng)上部署 Samza。但是如果你想避免集群管理系統(tǒng)呢?
Samza 現(xiàn)在可以將應(yīng)用程序部署為具有可插拔協(xié)調(diào)的簡(jiǎn)單嵌入式庫(kù)。使用嵌入式模式,您可以直接在應(yīng)用程序中使用 Samza 處理器,并以任何您喜歡的方式進(jìn)行部署。Samza 有一個(gè)可插拔的工作協(xié)調(diào)器層,用于執(zhí)行領(lǐng)導(dǎo)選舉,并為處理器分配工作。
本節(jié)將重點(diǎn)介紹新的嵌入式部署功能。
我們來(lái)仔細(xì)看看嵌入式部署的工作原理。
上面的概念部分提供了能夠靈活部署模型的層的概述。新的嵌入式模式進(jìn)入了部署層。部署層包括向可用處理器分配輸入分區(qū)。
有兩種類型的分區(qū)分配模型可以通過(guò)配置中的 job.coordinator.factory 進(jìn)行控制:
通過(guò)外部分區(qū)管理,Samza 本身不管理分區(qū)。相反,它使用一個(gè) PassthroughJobCoordinator 供奉由SystemStreamPartitionGrouper提供的任何分區(qū)映射。外部分區(qū)管理有兩種常見(jiàn)的模式:
Samza 配備了 PassthroughJobCoordinatorFactory 一種便于這種類型的分區(qū)管理。
使用動(dòng)態(tài)分區(qū),分區(qū)在運(yùn)行時(shí)分布在可用處理器之間。如果可用處理器的數(shù)量發(fā)生變化(例如,如果有些處理器被關(guān)閉或添加),映射將被重新生成并重新分配給所有處理器。有關(guān)當(dāng)前映射的信息包含在稱為 JobModel 的特殊結(jié)構(gòu)中。有一個(gè)領(lǐng)先的處理器生成 JobModel 并將其分發(fā)到其他處理器。領(lǐng)導(dǎo)人由“領(lǐng)導(dǎo)人選舉”進(jìn)程決定。
我們來(lái)仔細(xì)看看動(dòng)態(tài)協(xié)調(diào)的工作原理。
處理器的動(dòng)態(tài)協(xié)調(diào)假定存在協(xié)調(diào)服務(wù)。該服務(wù)的主要職責(zé)是:
協(xié)調(diào)服務(wù)目前來(lái)自工作協(xié)調(diào)員工廠。Samza ZkJobCoordinatorFactory 有一個(gè)相應(yīng)的實(shí)現(xiàn) ZkCoordinationServiceFactory。
我們來(lái)看看基于 ZooKeeper 的嵌入式應(yīng)用程序的協(xié)調(diào)順序:
下圖顯示了 ZooKeeper 協(xié)調(diào)服務(wù)實(shí)現(xiàn)中協(xié)調(diào)器的關(guān)系。
以下是協(xié)調(diào)服務(wù)的幾個(gè)重要細(xì)節(jié):
嵌入式部署旨在幫助希望更好地控制其應(yīng)用程序部署的用戶。因此,用戶有責(zé)任配置和部署處理器。在 ZooKeeper 協(xié)調(diào)的情況下,您還需要配置 ZooKeeper 實(shí)例的 URL。
此外,每個(gè)處理器需要與協(xié)調(diào)服務(wù)一起使用的唯一 ID。如果位置關(guān)聯(lián)性很重要,則該 ID 對(duì)于特定主機(jī)名上的每個(gè)處理器(假設(shè)本地存儲(chǔ)服務(wù))應(yīng)該是唯一的。為了滿足這一要求,Samza 使用ProcessorIdGenerator為每個(gè)處理器提供 ID。如果沒(méi)有顯式配置生成器,默認(rèn)情況下將為每個(gè)處理器創(chuàng)建一個(gè) UUID。
要運(yùn)行嵌入式 Samza 處理器,您需要使用 job.coordinator.factory 屬性配置協(xié)調(diào)器服務(wù)。此外,目前有一個(gè)支持嵌入式模式的 taskname 分組,因此您必須明確配置。
我們來(lái)看看如何配置 Samza 附帶的兩個(gè)協(xié)調(diào)服務(wù)實(shí)現(xiàn)。
要使用基于 ZooKeeper 的協(xié)調(diào),需要以下配置:
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
job.coordinator.zk.connect=yourzkconnection
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
要使用外部協(xié)調(diào),需要以下配置:
job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
如上面的概述部分所述,您可以使用 LocalApplicationRunner 從應(yīng)用程序代碼啟動(dòng)處理器,如下所示:
public class WikipediaZkLocalApplication {
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
OptionSet options = cmdLine.parser().parse(args);
Config config = cmdLine.loadConfig(options);
LocalApplicationRunner runner = new LocalApplicationRunner(config);
WikipediaApplication app = new WikipediaApplication();
runner.run(app);
runner.waitForFinish();
}
}
在上面的代碼中,WikipediaApplication 是用高級(jí)API編寫(xiě)的應(yīng)用程序。
請(qǐng)查看本教程,以便現(xiàn)在在機(jī)器上使用 ZooKeeper 協(xié)調(diào)運(yùn)行此應(yīng)用程序。
您可以以任何您喜歡的方式部署應(yīng)用程序?qū)嵗?。如果使用協(xié)調(diào)服務(wù),您可以隨時(shí)添加或刪除實(shí)例,并且領(lǐng)導(dǎo)者的工作協(xié)調(diào)員(通過(guò)協(xié)調(diào)服務(wù)選舉)將在去抖動(dòng)時(shí)間后自動(dòng)重新計(jì)算 JobModel,并將其應(yīng)用于可用的處理器。所以,為了擴(kuò)展你的應(yīng)用程序,你只需要啟動(dòng)更多的處理器。
請(qǐng)注意0??.13.0版本的嵌入式部署功能的以下問(wèn)題。他們將在隨后的版本中修復(fù)。
更多建議: