Samza Async API和多線程用戶指南

2018-08-21 18:46 更新

本教程提供了使用 Samza 異步 API 和多線程的示例和指南。

多線程同步過程

如果您的工作過程涉及同步 IO 或阻塞 IO,則可以簡單地配置 Samza 內(nèi)置線程池來并行運(yùn)行任務(wù)。在以下示例中,SyncRestTask 使用 Jersey 客戶端在每個(gè)進(jìn)程()中進(jìn)行休息調(diào)用。

public class SyncRestTask implements StreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target;

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    Response response = target.request().get();
    System.out.println("Response status code " + response.getStatus() + " received.");
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

默認(rèn)情況下,Samza 將在單個(gè)線程中順序運(yùn)行此任務(wù)。在下面我們配置大小為16的線程池并行運(yùn)行任務(wù):

# Thread pool to run synchronous tasks in parallel.
job.container.thread.pool.size=16

注意:線程池將用于運(yùn)行任務(wù)的所有同步操作,包括 StreamTask.process(),WindowableTask.window()和內(nèi)部的 Task.commit()。這是為了最大化任務(wù)之間的并行性以及減少阻塞時(shí)間。在多線程中運(yùn)行任務(wù)時(shí),默認(rèn)情況下,Samza 仍保證任務(wù)內(nèi)的消息的按順序處理。

AsyncStreamTask API 的異步過程

如果您的工作過程是異步的,例如,進(jìn)行非阻塞的遠(yuǎn)程 IO 調(diào)用,AsyncStreamTask 接口將為其提供支持。在下面的例子中,AsyncRestTask 會(huì)使異步休息調(diào)用,并在完成后觸發(fā)回調(diào)。

public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target;

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        System.out.println("Response status code " + response.getStatus() + " received.");
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

在上面的例子中,processAsync()返回時(shí),進(jìn)程不完整。在來自澤西客戶端的回調(diào)線程中,我們觸發(fā) TaskCallback 以指示進(jìn)程完成。為了確保在一定時(shí)間間隔(例如5秒)內(nèi)觸發(fā)回調(diào),您可以配置以下屬性:

# Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
task.callback.timeout.ms=5000

注意:默認(rèn)情況下,Samza 還保證 AsyncStreamTask 中消息的按順序進(jìn)程,這意味著在觸發(fā)前一個(gè) processAsync()回調(diào)之前,任務(wù)的下一個(gè) processAsync()才會(huì)被調(diào)用。

無序過程

在上述兩種情況下,Samza 默認(rèn)支持按順序進(jìn)行。通過允許任務(wù)并行處理多個(gè)未完成的消息也支持進(jìn)一步的并行性。以下配置允許一個(gè)任務(wù)一次處理最多4個(gè)未完成的消息:

# Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
task.max.concurrency=4

注意:在 AsyncStreamTask 的情況下,processAsync()仍然按消息到達(dá)的順序調(diào)用,但完成可能會(huì)出錯(cuò)。在具有多線程的 StreamTask 的情況下,process()可以無序運(yùn)行,因?yàn)樗鼈儽环峙傻骄€程池。應(yīng)此選項(xiàng)不是在需要輸出的嚴(yán)格的順序使用。

保證語義

在任何情況下,Samza 保證以下語義:

  • Samza 是安全的。您可以安全地訪問任務(wù)線程中的鍵值存儲(chǔ),寫入消息和檢查點(diǎn)偏移量的作業(yè)狀態(tài)。如果您在任務(wù)之間共享其他數(shù)據(jù),例如全局變量或靜態(tài)數(shù)據(jù),則如果可以通過多個(gè)線程并發(fā)訪問數(shù)據(jù),例如 StreamTask 在配置的線程池中運(yùn)行多個(gè)線程,則不會(huì)線程安全。對于任務(wù)中的狀態(tài)(如成員變量),Samza 保證進(jìn)程,窗口和提交的相互排他性,因此這些操作之間不會(huì)有并發(fā)的修改,任何來自一個(gè)操作的狀態(tài)變化都將完全可見。
  • 當(dāng)沒有未完成的進(jìn)程 / processAsync 并且沒有新的進(jìn)程/進(jìn)程同步調(diào)用時(shí),WindowableTask.window 被調(diào)用,直到完成為止。Samza 引擎負(fù)責(zé)確保及時(shí)調(diào)用該窗口。
  • 檢查點(diǎn)保證僅覆蓋完全處理的事件。它保存在 commit()方法中。
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)