Samza的定期檢查

2018-08-22 17:36 更新

Samza 提供流的容錯處理:Samza 保證消息不會丟失,即使您的工作崩潰,如果機(jī)器死機(jī),如果有網(wǎng)絡(luò)故障或其他問題。為了提供這一保證,Samza 希望輸入系統(tǒng)滿足以下要求:

  • 流可以劃分成一個或多個分區(qū)。每個分區(qū)獨立于其他分區(qū),并且跨多臺計算機(jī)進(jìn)行復(fù)制(即使機(jī)器發(fā)生故障,該流仍然可用)。
  • 每個分區(qū)由固定順序的消息序列組成。每個消息都有一個偏移量,表示其在該順序中的位置。消息總是在每個分區(qū)內(nèi)依次消耗。
  • Samza 作業(yè)可以從任何起始偏移開始消耗消息序列。

Kafka 符合這些要求,但也可以與其他消息代理系統(tǒng)一起實施。

SamzaContainer一節(jié)中所述,您的任務(wù)的每個任務(wù)實例都會占用一個輸入流的一個分區(qū)。每個任務(wù)對于每個輸入流具有當(dāng)前偏移量:要從該流分區(qū)讀取的下一個消息的偏移量。每次從流中讀取消息時,當(dāng)前的偏移向前移動。

如果 Samza 容器出現(xiàn)故障,則需要重新啟動(潛在地在另一臺機(jī)器上),并恢復(fù)處理失敗的容器。為了實現(xiàn)這一點,容器定期檢查每個任務(wù)實例的當(dāng)前偏移量。

1502854014329507

當(dāng) Samza 容器啟動時,它會查找最新的檢查點,并從檢查點偏移開始消費消息。如果上一個容器意外失敗,則最近的檢查點可能稍微偏離當(dāng)前的偏移量(即,自上次檢查點寫入以來,該作業(yè)可能已經(jīng)消耗了一些更多的消息),但是我們無法確定。在這種情況下,該作業(yè)可能再次處理幾個消息。

這種保證被稱為至少一次處理:Samza 確保您的工作不會錯過任何消息,即使容器需要重新啟動。但是,當(dāng)重新啟動容器時,您的作業(yè)可能會多次看到相同的消息。我們計劃在 Samza 的未來版本中解決這個問題,但現(xiàn)在只需要注意一些事情:例如,如果您正在計算頁面瀏覽量,強(qiáng)制殺死的容器可能導(dǎo)致事件略微過度計數(shù)。您可以以更低的性能成本更頻繁地檢查點來減少重復(fù)。

為了使檢查站有效,他們需要寫在他們能夠在故障中生存的地方。Samza 允許您將檢查點寫入文件系統(tǒng)(使用 FileSystemCheckpointManager ),但是如果機(jī)器發(fā)生故障,并且需要在另一臺機(jī)器上重新啟動該容器,則不起作用。最常見的配置是使用 Kafka 作為檢查點。您可以通過以下作業(yè)配置啟用此功能:

# The name of your job determines the name under which checkpoints will be stored
job.name=example-job

# Define a system called "kafka" for consuming and producing to a Kafka cluster
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka

# By default, a checkpoint is written every 60 seconds. You can change this if you like.
task.commit.ms=60000

在這種配置中,Samza 將檢查點寫入一個名為 “__samza_checkpoint_ <job-name> _ <job-id>” 的單獨的 Kafka 主題(在上面的示例配置中,該主題稱為“__samza_checkpoint_example-job_1”)。每分鐘一次,Samza 會自動發(fā)送一個消息到這個主題,其中輸入流的當(dāng)前偏移被編碼。當(dāng) Samza 容器啟動時,它會在此主題中查找最新的偏移消息,并加載該檢查點。

有時只能對某些輸入流使用檢查點,但不適用于其他輸入流。在這種情況下,您可以告訴 Samza 忽略特定流名稱的任何檢查點偏移量:

# Ignore any checkpoints for the topic "my-special-topic"
systems.kafka.streams.my-special-topic.samza.reset.offset=true

# Always start consuming "my-special-topic" at the oldest available offset
systems.kafka.streams.my-special-topic.samza.offset.default=oldest

下表說明了這些配置參數(shù)的含義:

參數(shù)名稱含義
系統(tǒng)。<系統(tǒng)>。
流。<流>。
samza.reset.offset
假(默認(rèn))容器啟動時,從上次檢查點恢復(fù)處理
真正忽略檢查點(假裝沒有檢查點存在)
系統(tǒng)。<系統(tǒng)>。
流。<流>。
samza.offset.default
即將到來(默認(rèn))當(dāng)容器啟動并且沒有檢查點(或檢查點被忽略)時,僅處理在作業(yè)啟動后發(fā)布的消息,但沒有舊消息
最老的當(dāng)容器啟動并且沒有檢查點(或檢查點被忽略)時,跳回到系統(tǒng)中最舊的可用消息,并從該點開始消耗所有消息(很可能這意味著先前已經(jīng)看到的消息的重復(fù)處理)

請注意,上述示例配置使您的任務(wù)在每次啟動容器時從最早的偏移量開始消耗。這在您的任務(wù)中需要從輸入流中的源數(shù)據(jù)重建時具有某些內(nèi)存狀態(tài)時非常有用。如果您以這種方式使用流,您可能還會發(fā)現(xiàn)引導(dǎo)流很有用。

手動操作檢查點

如果要對作業(yè)的消費者偏移進(jìn)行一次性更改,例如強(qiáng)制使用新版本的代碼再次處理舊消息,則可以使用 CheckpointTool 來檢查和操作作業(yè)的檢查點。該工具包含在 Samza 的源存儲庫中。

要檢查工作的最新檢查點,您需要指定作業(yè)的配置文件,以便該工具知道要處理的作業(yè):

samza-example/target/bin/checkpoint-tool.sh \
  --config-path=file:///path/to/job/config.properties

此命令以屬性文件格式打印出最新的檢查點。您可以將輸出保存到文件中,并根據(jù)需要進(jìn)行編輯。例如,要跳回到最早可能的時間點,您可以將所有偏移量設(shè)置為0.然后可以將該屬性文件反饋到 checkpoint-tool.sh 并保存修改的檢查點:

samza-example/target/bin/checkpoint-tool.sh \
  --config-path=file:///path/to/job/config.properties \
  --new-offsets=file:///path/to/new/offsets.properties

請注意,Samza 僅在容器啟動時讀取檢查點。為了使檢查點更改生效,您需要先停止作業(yè),然后保存修改的偏移量,然后再次啟動作業(yè)。如果在作業(yè)正在運行時寫一個檢查點,那么它很有可能沒有效果。

檢查點回調(diào)

目前 Samza 負(fù)責(zé)所有系統(tǒng)的檢查點。但是有一些用例可能需要通知消費者我們制作的每個檢查點。以下是幾個例子:

  • Samza 無法正確或有效地進(jìn)行檢查點。一個這樣的情況是 Samza 沒有做分區(qū)。在這種情況下,容器不知道它負(fù)責(zé)哪個 SSP,因此不能檢查它們。一個實際的例子可能是一個依賴自動平衡的高級卡夫卡消費者進(jìn)行分區(qū)的系統(tǒng)。
  • 消費者自身需要控制檢查點偏移的系統(tǒng)。某些系統(tǒng)不支持 seek()操作(不可重放),但它們依賴于傳遞消息的 ACK。例如可以是 Kinesis 消費者。Kinesis 庫在* process()* call(推系統(tǒng))中提供檢查點回調(diào)。在處理記錄之后需要調(diào)用此回調(diào)。這只能由消費者本身來完成。
  • 使用檢查點/偏移量信息進(jìn)行某些維護(hù)操作的系統(tǒng)。該信息可用于實施智能保留策略(在消耗完所有數(shù)據(jù)后將所有數(shù)據(jù)刪除)。

為了使用檢查點回調(diào),SystemConsumer 需要實現(xiàn) CheckpointListener 接口:

public interface CheckpointListener {
  void onCheckpoint(Map<SystemStreamPartition, String> offsets);
}

對于實現(xiàn)此接口的 SystemConsumers,Samza 將每次在 OffsetManager 檢查點調(diào)用 onCheckpoint()回調(diào)。檢查點是根據(jù)任務(wù)完成的,“偏移”是任務(wù)的 Samza 檢查點的所有偏移量,這些是在重新啟動時傳遞給消費者的偏移量。請注意,回調(diào)將在檢查點之后發(fā)生,并且不是原子的。

狀態(tài)管理  ?



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號