Samza 重新處理以前處理的數(shù)據(jù)

2018-08-22 18:07 更新

有時您可能希望部署新版本的 Samza 作業(yè),以不同的方式計算結(jié)果;也許你修復(fù)了一個 bug 或者引入了一個新功能。例如,假設(shè)您有一個 Samza 工作,將郵件分類為垃圾郵件或非垃圾郵件,使用您在離線培訓(xùn)的機器學(xué)習(xí)模型。您希望定期部署 Samza 工作的更新版本,其中包括最新的分類模型。

當(dāng)您啟動新版本的工作時,會出現(xiàn)一個問題:您以前用舊版本的作業(yè)處理的郵件要做什么?答案取決于你想要的行為:

  1. 無重新處理:默認(rèn)情況下,Samza 假定舊版本處理的消息不需要再次處理。當(dāng)新版本啟動時,它將在舊版本停止時恢復(fù)處理(假設(shè)您啟用了檢查點)。如果這是你想要的行為,沒有什么特別需要做的。
  2. 簡單的倒帶:也許你想回去并使用新版本的工作重新處理舊郵件。例如,也許舊版本的分類器將垃圾郵件標(biāo)記為太大,所以您現(xiàn)在想要使用改進(jìn)的分類器重新審視其以前的垃圾郵件/非垃圾郵件決定。您可以通過在流中較舊的時間點重新啟動作業(yè),并從該時間開始運行所有消息來執(zhí)行此操作。因此,您的工作開始重新處理已經(jīng)看到的消息,但是當(dāng)重新處理完成后,它將無縫地繼續(xù)新的消息。

這種方法需要一個輸入系統(tǒng),如 Kafka,它允許您在時間上跳回到流中的上一個點。下面我們來討論這個工作原理。

  1. 平行倒帶:這種方法避免了簡單的倒帶方法的缺點。通過簡單的倒帶,作業(yè)正在重新處理舊數(shù)據(jù)時出現(xiàn)的任何新消息都將排隊,并在重新處理完成后進(jìn)行處理。排隊延遲不需要太長時間,因為 Samza 可以快速流式傳輸歷史數(shù)據(jù),但一些延遲敏感的應(yīng)用程序需要更快地處理消息。

在并行重繞方法中,您并行運行兩個作業(yè):一個作業(yè)繼續(xù)處理低延遲(實時作業(yè))的實時更新,而另一個作業(yè)在流中的較舊點啟動并重新處理歷史數(shù)據(jù)(后處理工作)。這兩個作業(yè)在不同的時間點消耗相同的輸入流,最終后處理工作可以滿足實時工作。

在部署并行倒帶之前,您需要仔細(xì)思考一些細(xì)節(jié),我們在下面討論。

及時跳回

簡單的倒帶和平行倒帶方法的一個常見方面是:您有一個工作可以在輸入流中跳回舊的時間點,并從那時起就消耗所有的消息。通過與 Samza 的檢查點合作來實現(xiàn)這一點。

通常,當(dāng) Samza 作業(yè)啟動時,它會讀取最新的檢查點,以確定需要恢復(fù)處理的輸入流中哪個偏移量。如果你需要回到更早的時間,可以采取以下兩種方式之一:

  1. 您可以停止該作業(yè),操作其最后一個檢查點以指向較舊的偏移量,然后重新啟動該作業(yè)。Samza 包括一個名為 CheckpointTool 的命令行工具,可用于操作檢查點。
  2. 您可以使用不同的 job.name 或 job.id 啟動新作業(yè)(例如,每當(dāng)您需要及時跳回時,都會增加 job.id)。這給工作一個新的檢查點流,沒有舊的檢查點信息。您還需要設(shè)置 samza.offset.default =最舊的,這樣當(dāng)作業(yè)啟動時沒有檢查點,它開始消耗在可用的最舊的偏移量。

使用這些方法之一,您可以讓 Samza 在輸入系統(tǒng)中重新處理消息的整個歷史記錄。輸入系統(tǒng)如 Kafka 可以保留大量的歷史 - 參見下面的討論。為了加快歷史數(shù)據(jù)的再處理,可以(增加容器數(shù) job.container.count 如果你在紗線行走 Samza),以提高你的工作的計算資源。

如果您的工作保持任何持續(xù)狀態(tài),則需要在及時跳回時小心:重置檢查點不會自動更改持久狀態(tài),因此您可以在稍后使用狀態(tài)時最終重新處理舊消息。在大多數(shù)情況下,及時跳回的工作應(yīng)以空狀態(tài)開始。您可以通過刪除更改日志主題或更改作業(yè)配置中更改日志主題的名稱來重置狀態(tài)。

當(dāng)你及時回來時,你使用 Samza 有點像一個批處理框架(例如 MapReduce) - 區(qū)別在于處理所有歷史數(shù)據(jù)后,你的工作不會停止,而是繼續(xù)運行處理新消息流時,它的優(yōu)勢在于您不需要編寫和維護(hù)您的工作的單獨批量和流式版本:您可以使用相同的 Samza API 來處理實時和歷史數(shù)據(jù)。

保留歷史

Samza 本身不保留歷史 - 輸入系統(tǒng)的責(zé)任,如 Kafka。你可以跳多遠(yuǎn)的時間取決于該系統(tǒng)中保留的歷史記錄數(shù)量。

Kafka 旨在保持相當(dāng)大的歷史:Kafka 經(jīng)紀(jì)人通??梢员A粢粋€或兩個星期的消息歷史記錄,即使是大量的話題。保留期大部分取決于您有多少磁盤空間。即使你有太字節(jié)的歷史,Kafka 的表現(xiàn)仍然很高。

X- 200 200 X- 200 200 X- 200 200 X-

  • 活動事件包括用戶跟蹤事件,Web 服務(wù)器日志事件等。這種流通常配置有基于時間的保留,例如數(shù)周。比保留期更早的事件將被刪除(或存檔在脫機系統(tǒng),如 HDFS)中。
  • 數(shù)據(jù)庫更改是在數(shù)據(jù)庫中顯示插入,更新和刪除的事件。在這種流中,每個事件通常都有一個主鍵,一個新的事件用于覆蓋同一個鍵的任何較舊的事件。如果相同的密鑰更新了很多次,那么您只對最近的值感興趣。(Samza 持續(xù)狀態(tài)使用的更改日志流屬于此類別。)

在數(shù)據(jù)庫更改流中,當(dāng)您重新處理數(shù)據(jù)時,通常要重新處理整個數(shù)據(jù)庫。你不想錯過一個值,因為它上次更新是在幾個星期前。換句話說,您不希望只是因為比某個閾值更早而刪除更改事件。在這種情況下,當(dāng)您及時跳回時,您需要重新開始時間,即先對數(shù)據(jù)庫進(jìn)行的更改(在 Kafka 中稱為 “offset 0”)。

幸運的是,可以使用稱為日志壓縮的 Kafka 功能有效地完成此操作。

例如,假設(shè)您的數(shù)據(jù)庫包含計數(shù)器:每當(dāng)發(fā)生一些事情時,您將增加相應(yīng)的計數(shù)器并使用新的計數(shù)器值更新數(shù)據(jù)庫。每個更新都會發(fā)送到更改日志,并且由于有更多更新,更改日志流將占用大量空間。啟用日志壓縮后,Kafka 會在后臺對數(shù)據(jù)流進(jìn)行重復(fù)數(shù)據(jù)刪除,只保留每個密鑰的最新計數(shù)器值,并刪除相同計數(shù)器的任何舊值。這樣可以減少流的大小,以便您可以保留每個密鑰的最新更新,即使最近更新了很久以前。

啟用日志壓縮后,數(shù)據(jù)庫更改流將成為整個數(shù)據(jù)庫的完整副本。通過跳轉(zhuǎn)到 0,您的 Samza 作業(yè)可以掃描整個數(shù)據(jù)庫并重新處理它。這是構(gòu)建可擴展應(yīng)用程序的非常強大的方法。

平行回卷的細(xì)節(jié)

如果您采用上述并行重繞方法,并行運行兩個作業(yè),則需要仔細(xì)配置它們以避免出現(xiàn)問題。特別要注意的是:

  • 確保兩個作業(yè)不會相互干擾。他們需要不同的 job.name 或 job.id 配置屬性,以便每個作業(yè)都有自己的檢查點流。如果作業(yè)保持持續(xù)狀態(tài),每個作業(yè)都需要自己的更改日志(兩個不同的作業(yè)寫入相同的更新日志會產(chǎn)生未定義的結(jié)果)。
  • 工作輸出會怎樣?如果作業(yè)將其結(jié)果發(fā)送到輸出流或?qū)懭霐?shù)據(jù)庫,則最簡單的解決方案是為每個作業(yè)提供單獨的輸出流或數(shù)據(jù)庫表。如果寫入相同的輸出,則需要注意確保較新的數(shù)據(jù)不被舊數(shù)據(jù)覆蓋(由于兩個作業(yè)之間的競爭條件)。
  • 您是否需要在舊版本和新版本的工作之間支持A / B測試,例如,測試新版本是否改進(jìn)了您的指標(biāo)?平行倒帶是理想的:每個作業(yè)寫入單獨的輸出,輸出的客戶端或消費者可以從舊版本或新版本的輸出讀取,具體取決于用戶是否在測試組A或B中。
  • 回收資源:即使新版本已經(jīng)完成重新處理歷史數(shù)據(jù)(尤其是在A / B測試中使用舊版本的輸出),您可能希望保持舊版本的作業(yè)運行一段時間。但是,最終你需要關(guān)閉它,并刪除屬于舊版本的檢查點和 changelog 流。

Samza 為您提供了重新處理歷史數(shù)據(jù)的很多靈活性,您不需要針對單獨的批處理 API 進(jìn)行編程,以利用它。如果您注意到這些問題,您可以建立一個非常強大的數(shù)據(jù)系統(tǒng),但是您仍然可以隨時更改處理邏輯。

Web UI 和 REST API  ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號