W3Cschool
恭喜您成為首批注冊(cè)用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
本教程提供了使用 Samza 異步 API 和多線程的示例和指南。
如果您的工作過程涉及同步 IO 或阻塞 IO,則可以簡單地配置 Samza 內(nèi)置線程池來并行運(yùn)行任務(wù)。在以下示例中,SyncRestTask 使用 Jersey 客戶端在每個(gè)進(jìn)程()中進(jìn)行休息調(diào)用。
public class SyncRestTask implements StreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Response response = target.request().get();
System.out.println("Response status code " + response.getStatus() + " received.");
}
@Override
public void close() throws Exception {
client.close();
}
}
默認(rèn)情況下,Samza 將在單個(gè)線程中順序運(yùn)行此任務(wù)。在下面我們配置大小為16的線程池并行運(yùn)行任務(wù):
# Thread pool to run synchronous tasks in parallel.
job.container.thread.pool.size=16
注意:線程池將用于運(yùn)行任務(wù)的所有同步操作,包括 StreamTask.process(),WindowableTask.window()和內(nèi)部的 Task.commit()。這是為了最大化任務(wù)之間的并行性以及減少阻塞時(shí)間。在多線程中運(yùn)行任務(wù)時(shí),默認(rèn)情況下,Samza 仍保證任務(wù)內(nèi)的消息的按順序處理。
如果您的工作過程是異步的,例如,進(jìn)行非阻塞的遠(yuǎn)程 IO 調(diào)用,AsyncStreamTask 接口將為其提供支持。在下面的例子中,AsyncRestTask 會(huì)使異步休息調(diào)用,并在完成后觸發(fā)回調(diào)。
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
System.out.println("Response status code " + response.getStatus() + " received.");
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
在上面的例子中,processAsync()返回時(shí),進(jìn)程不完整。在來自澤西客戶端的回調(diào)線程中,我們觸發(fā) TaskCallback 以指示進(jìn)程完成。為了確保在一定時(shí)間間隔(例如5秒)內(nèi)觸發(fā)回調(diào),您可以配置以下屬性:
# Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
task.callback.timeout.ms=5000
注意:默認(rèn)情況下,Samza 還保證 AsyncStreamTask 中消息的按順序進(jìn)程,這意味著在觸發(fā)前一個(gè) processAsync()回調(diào)之前,任務(wù)的下一個(gè) processAsync()才會(huì)被調(diào)用。
在上述兩種情況下,Samza 默認(rèn)支持按順序進(jìn)行。通過允許任務(wù)并行處理多個(gè)未完成的消息也支持進(jìn)一步的并行性。以下配置允許一個(gè)任務(wù)一次處理最多4個(gè)未完成的消息:
# Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
task.max.concurrency=4
注意:在 AsyncStreamTask 的情況下,processAsync()仍然按消息到達(dá)的順序調(diào)用,但完成可能會(huì)出錯(cuò)。在具有多線程的 StreamTask 的情況下,process()可以無序運(yùn)行,因?yàn)樗鼈儽环峙傻骄€程池。應(yīng)此選項(xiàng)不是在需要輸出的嚴(yán)格的順序使用。
在任何情況下,Samza 保證以下語義:
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: