Samza 提供流的容錯處理:Samza 保證消息不會丟失,即使您的工作崩潰,如果機(jī)器死機(jī),如果有網(wǎng)絡(luò)故障或其他問題。為了提供這一保證,Samza 希望輸入系統(tǒng)滿足以下要求:
Kafka 符合這些要求,但也可以與其他消息代理系統(tǒng)一起實施。
如SamzaContainer一節(jié)中所述,您的任務(wù)的每個任務(wù)實例都會占用一個輸入流的一個分區(qū)。每個任務(wù)對于每個輸入流具有當(dāng)前偏移量:要從該流分區(qū)讀取的下一個消息的偏移量。每次從流中讀取消息時,當(dāng)前的偏移向前移動。
如果 Samza 容器出現(xiàn)故障,則需要重新啟動(潛在地在另一臺機(jī)器上),并恢復(fù)處理失敗的容器。為了實現(xiàn)這一點,容器定期檢查每個任務(wù)實例的當(dāng)前偏移量。
當(dāng) Samza 容器啟動時,它會查找最新的檢查點,并從檢查點偏移開始消費消息。如果上一個容器意外失敗,則最近的檢查點可能稍微偏離當(dāng)前的偏移量(即,自上次檢查點寫入以來,該作業(yè)可能已經(jīng)消耗了一些更多的消息),但是我們無法確定。在這種情況下,該作業(yè)可能再次處理幾個消息。
這種保證被稱為至少一次處理:Samza 確保您的工作不會錯過任何消息,即使容器需要重新啟動。但是,當(dāng)重新啟動容器時,您的作業(yè)可能會多次看到相同的消息。我們計劃在 Samza 的未來版本中解決這個問題,但現(xiàn)在只需要注意一些事情:例如,如果您正在計算頁面瀏覽量,強(qiáng)制殺死的容器可能導(dǎo)致事件略微過度計數(shù)。您可以以更低的性能成本更頻繁地檢查點來減少重復(fù)。
為了使檢查站有效,他們需要寫在他們能夠在故障中生存的地方。Samza 允許您將檢查點寫入文件系統(tǒng)(使用 FileSystemCheckpointManager ),但是如果機(jī)器發(fā)生故障,并且需要在另一臺機(jī)器上重新啟動該容器,則不起作用。最常見的配置是使用 Kafka 作為檢查點。您可以通過以下作業(yè)配置啟用此功能:
# The name of your job determines the name under which checkpoints will be stored
job.name=example-job
# Define a system called "kafka" for consuming and producing to a Kafka cluster
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# By default, a checkpoint is written every 60 seconds. You can change this if you like.
task.commit.ms=60000
在這種配置中,Samza 將檢查點寫入一個名為 “__samza_checkpoint_ <job-name> _ <job-id>” 的單獨的 Kafka 主題(在上面的示例配置中,該主題稱為“__samza_checkpoint_example-job_1”)。每分鐘一次,Samza 會自動發(fā)送一個消息到這個主題,其中輸入流的當(dāng)前偏移被編碼。當(dāng) Samza 容器啟動時,它會在此主題中查找最新的偏移消息,并加載該檢查點。
有時只能對某些輸入流使用檢查點,但不適用于其他輸入流。在這種情況下,您可以告訴 Samza 忽略特定流名稱的任何檢查點偏移量:
# Ignore any checkpoints for the topic "my-special-topic"
systems.kafka.streams.my-special-topic.samza.reset.offset=true
# Always start consuming "my-special-topic" at the oldest available offset
systems.kafka.streams.my-special-topic.samza.offset.default=oldest
下表說明了這些配置參數(shù)的含義:
參數(shù)名稱 | 值 | 含義 |
---|---|---|
系統(tǒng)。<系統(tǒng)>。 流。<流>。 samza.reset.offset | 假(默認(rèn)) | 容器啟動時,從上次檢查點恢復(fù)處理 |
真正 | 忽略檢查點(假裝沒有檢查點存在) | |
系統(tǒng)。<系統(tǒng)>。 流。<流>。 samza.offset.default | 即將到來(默認(rèn)) | 當(dāng)容器啟動并且沒有檢查點(或檢查點被忽略)時,僅處理在作業(yè)啟動后發(fā)布的消息,但沒有舊消息 |
最老的 | 當(dāng)容器啟動并且沒有檢查點(或檢查點被忽略)時,跳回到系統(tǒng)中最舊的可用消息,并從該點開始消耗所有消息(很可能這意味著先前已經(jīng)看到的消息的重復(fù)處理) |
請注意,上述示例配置使您的任務(wù)在每次啟動容器時從最早的偏移量開始消耗。這在您的任務(wù)中需要從輸入流中的源數(shù)據(jù)重建時具有某些內(nèi)存狀態(tài)時非常有用。如果您以這種方式使用流,您可能還會發(fā)現(xiàn)引導(dǎo)流很有用。
如果要對作業(yè)的消費者偏移進(jìn)行一次性更改,例如強(qiáng)制使用新版本的代碼再次處理舊消息,則可以使用 CheckpointTool 來檢查和操作作業(yè)的檢查點。該工具包含在 Samza 的源存儲庫中。
要檢查工作的最新檢查點,您需要指定作業(yè)的配置文件,以便該工具知道要處理的作業(yè):
samza-example/target/bin/checkpoint-tool.sh \
--config-path=file:///path/to/job/config.properties
此命令以屬性文件格式打印出最新的檢查點。您可以將輸出保存到文件中,并根據(jù)需要進(jìn)行編輯。例如,要跳回到最早可能的時間點,您可以將所有偏移量設(shè)置為0.然后可以將該屬性文件反饋到 checkpoint-tool.sh 并保存修改的檢查點:
samza-example/target/bin/checkpoint-tool.sh \
--config-path=file:///path/to/job/config.properties \
--new-offsets=file:///path/to/new/offsets.properties
請注意,Samza 僅在容器啟動時讀取檢查點。為了使檢查點更改生效,您需要先停止作業(yè),然后保存修改的偏移量,然后再次啟動作業(yè)。如果在作業(yè)正在運行時寫一個檢查點,那么它很有可能沒有效果。
目前 Samza 負(fù)責(zé)所有系統(tǒng)的檢查點。但是有一些用例可能需要通知消費者我們制作的每個檢查點。以下是幾個例子:
為了使用檢查點回調(diào),SystemConsumer 需要實現(xiàn) CheckpointListener 接口:
public interface CheckpointListener {
void onCheckpoint(Map<SystemStreamPartition, String> offsets);
}
對于實現(xiàn)此接口的 SystemConsumers,Samza 將每次在 OffsetManager 檢查點調(diào)用 onCheckpoint()回調(diào)。檢查點是根據(jù)任務(wù)完成的,“偏移”是任務(wù)的 Samza 檢查點的所有偏移量,這些是在重新啟動時傳遞給消費者的偏移量。請注意,回調(diào)將在檢查點之后發(fā)生,并且不是原子的。
更多建議: