Samza的定期檢查

2018-08-22 17:36 更新

Samza 提供流的容錯(cuò)處理:Samza 保證消息不會(huì)丟失,即使您的工作崩潰,如果機(jī)器死機(jī),如果有網(wǎng)絡(luò)故障或其他問(wèn)題。為了提供這一保證,Samza 希望輸入系統(tǒng)滿足以下要求:

  • 流可以劃分成一個(gè)或多個(gè)分區(qū)。每個(gè)分區(qū)獨(dú)立于其他分區(qū),并且跨多臺(tái)計(jì)算機(jī)進(jìn)行復(fù)制(即使機(jī)器發(fā)生故障,該流仍然可用)。
  • 每個(gè)分區(qū)由固定順序的消息序列組成。每個(gè)消息都有一個(gè)偏移量,表示其在該順序中的位置。消息總是在每個(gè)分區(qū)內(nèi)依次消耗。
  • Samza 作業(yè)可以從任何起始偏移開(kāi)始消耗消息序列。

Kafka 符合這些要求,但也可以與其他消息代理系統(tǒng)一起實(shí)施。

SamzaContainer一節(jié)中所述,您的任務(wù)的每個(gè)任務(wù)實(shí)例都會(huì)占用一個(gè)輸入流的一個(gè)分區(qū)。每個(gè)任務(wù)對(duì)于每個(gè)輸入流具有當(dāng)前偏移量:要從該流分區(qū)讀取的下一個(gè)消息的偏移量。每次從流中讀取消息時(shí),當(dāng)前的偏移向前移動(dòng)。

如果 Samza 容器出現(xiàn)故障,則需要重新啟動(dòng)(潛在地在另一臺(tái)機(jī)器上),并恢復(fù)處理失敗的容器。為了實(shí)現(xiàn)這一點(diǎn),容器定期檢查每個(gè)任務(wù)實(shí)例的當(dāng)前偏移量。

1502854014329507

當(dāng) Samza 容器啟動(dòng)時(shí),它會(huì)查找最新的檢查點(diǎn),并從檢查點(diǎn)偏移開(kāi)始消費(fèi)消息。如果上一個(gè)容器意外失敗,則最近的檢查點(diǎn)可能稍微偏離當(dāng)前的偏移量(即,自上次檢查點(diǎn)寫入以來(lái),該作業(yè)可能已經(jīng)消耗了一些更多的消息),但是我們無(wú)法確定。在這種情況下,該作業(yè)可能再次處理幾個(gè)消息。

這種保證被稱為至少一次處理:Samza 確保您的工作不會(huì)錯(cuò)過(guò)任何消息,即使容器需要重新啟動(dòng)。但是,當(dāng)重新啟動(dòng)容器時(shí),您的作業(yè)可能會(huì)多次看到相同的消息。我們計(jì)劃在 Samza 的未來(lái)版本中解決這個(gè)問(wèn)題,但現(xiàn)在只需要注意一些事情:例如,如果您正在計(jì)算頁(yè)面瀏覽量,強(qiáng)制殺死的容器可能導(dǎo)致事件略微過(guò)度計(jì)數(shù)。您可以以更低的性能成本更頻繁地檢查點(diǎn)來(lái)減少重復(fù)。

為了使檢查站有效,他們需要寫在他們能夠在故障中生存的地方。Samza 允許您將檢查點(diǎn)寫入文件系統(tǒng)(使用 FileSystemCheckpointManager ),但是如果機(jī)器發(fā)生故障,并且需要在另一臺(tái)機(jī)器上重新啟動(dòng)該容器,則不起作用。最常見(jiàn)的配置是使用 Kafka 作為檢查點(diǎn)。您可以通過(guò)以下作業(yè)配置啟用此功能:

# The name of your job determines the name under which checkpoints will be stored
job.name=example-job

# Define a system called "kafka" for consuming and producing to a Kafka cluster
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka

# By default, a checkpoint is written every 60 seconds. You can change this if you like.
task.commit.ms=60000

在這種配置中,Samza 將檢查點(diǎn)寫入一個(gè)名為 “__samza_checkpoint_ <job-name> _ <job-id>” 的單獨(dú)的 Kafka 主題(在上面的示例配置中,該主題稱為“__samza_checkpoint_example-job_1”)。每分鐘一次,Samza 會(huì)自動(dòng)發(fā)送一個(gè)消息到這個(gè)主題,其中輸入流的當(dāng)前偏移被編碼。當(dāng) Samza 容器啟動(dòng)時(shí),它會(huì)在此主題中查找最新的偏移消息,并加載該檢查點(diǎn)。

有時(shí)只能對(duì)某些輸入流使用檢查點(diǎn),但不適用于其他輸入流。在這種情況下,您可以告訴 Samza 忽略特定流名稱的任何檢查點(diǎn)偏移量:

# Ignore any checkpoints for the topic "my-special-topic"
systems.kafka.streams.my-special-topic.samza.reset.offset=true

# Always start consuming "my-special-topic" at the oldest available offset
systems.kafka.streams.my-special-topic.samza.offset.default=oldest

下表說(shuō)明了這些配置參數(shù)的含義:

參數(shù)名稱含義
系統(tǒng)。<系統(tǒng)>。
流。<流>。
samza.reset.offset
假(默認(rèn))容器啟動(dòng)時(shí),從上次檢查點(diǎn)恢復(fù)處理
真正忽略檢查點(diǎn)(假裝沒(méi)有檢查點(diǎn)存在)
系統(tǒng)。<系統(tǒng)>。
流。<流>。
samza.offset.default
即將到來(lái)(默認(rèn))當(dāng)容器啟動(dòng)并且沒(méi)有檢查點(diǎn)(或檢查點(diǎn)被忽略)時(shí),僅處理在作業(yè)啟動(dòng)后發(fā)布的消息,但沒(méi)有舊消息
最老的當(dāng)容器啟動(dòng)并且沒(méi)有檢查點(diǎn)(或檢查點(diǎn)被忽略)時(shí),跳回到系統(tǒng)中最舊的可用消息,并從該點(diǎn)開(kāi)始消耗所有消息(很可能這意味著先前已經(jīng)看到的消息的重復(fù)處理)

請(qǐng)注意,上述示例配置使您的任務(wù)在每次啟動(dòng)容器時(shí)從最早的偏移量開(kāi)始消耗。這在您的任務(wù)中需要從輸入流中的源數(shù)據(jù)重建時(shí)具有某些內(nèi)存狀態(tài)時(shí)非常有用。如果您以這種方式使用流,您可能還會(huì)發(fā)現(xiàn)引導(dǎo)流很有用。

手動(dòng)操作檢查點(diǎn)

如果要對(duì)作業(yè)的消費(fèi)者偏移進(jìn)行一次性更改,例如強(qiáng)制使用新版本的代碼再次處理舊消息,則可以使用 CheckpointTool 來(lái)檢查和操作作業(yè)的檢查點(diǎn)。該工具包含在 Samza 的源存儲(chǔ)庫(kù)中。

要檢查工作的最新檢查點(diǎn),您需要指定作業(yè)的配置文件,以便該工具知道要處理的作業(yè):

samza-example/target/bin/checkpoint-tool.sh \
  --config-path=file:///path/to/job/config.properties

此命令以屬性文件格式打印出最新的檢查點(diǎn)。您可以將輸出保存到文件中,并根據(jù)需要進(jìn)行編輯。例如,要跳回到最早可能的時(shí)間點(diǎn),您可以將所有偏移量設(shè)置為0.然后可以將該屬性文件反饋到 checkpoint-tool.sh 并保存修改的檢查點(diǎn):

samza-example/target/bin/checkpoint-tool.sh \
  --config-path=file:///path/to/job/config.properties \
  --new-offsets=file:///path/to/new/offsets.properties

請(qǐng)注意,Samza 僅在容器啟動(dòng)時(shí)讀取檢查點(diǎn)。為了使檢查點(diǎn)更改生效,您需要先停止作業(yè),然后保存修改的偏移量,然后再次啟動(dòng)作業(yè)。如果在作業(yè)正在運(yùn)行時(shí)寫一個(gè)檢查點(diǎn),那么它很有可能沒(méi)有效果。

檢查點(diǎn)回調(diào)

目前 Samza 負(fù)責(zé)所有系統(tǒng)的檢查點(diǎn)。但是有一些用例可能需要通知消費(fèi)者我們制作的每個(gè)檢查點(diǎn)。以下是幾個(gè)例子:

  • Samza 無(wú)法正確或有效地進(jìn)行檢查點(diǎn)。一個(gè)這樣的情況是 Samza 沒(méi)有做分區(qū)。在這種情況下,容器不知道它負(fù)責(zé)哪個(gè) SSP,因此不能檢查它們。一個(gè)實(shí)際的例子可能是一個(gè)依賴自動(dòng)平衡的高級(jí)卡夫卡消費(fèi)者進(jìn)行分區(qū)的系統(tǒng)。
  • 消費(fèi)者自身需要控制檢查點(diǎn)偏移的系統(tǒng)。某些系統(tǒng)不支持 seek()操作(不可重放),但它們依賴于傳遞消息的 ACK。例如可以是 Kinesis 消費(fèi)者。Kinesis 庫(kù)在* process()* call(推系統(tǒng))中提供檢查點(diǎn)回調(diào)。在處理記錄之后需要調(diào)用此回調(diào)。這只能由消費(fèi)者本身來(lái)完成。
  • 使用檢查點(diǎn)/偏移量信息進(jìn)行某些維護(hù)操作的系統(tǒng)。該信息可用于實(shí)施智能保留策略(在消耗完所有數(shù)據(jù)后將所有數(shù)據(jù)刪除)。

為了使用檢查點(diǎn)回調(diào),SystemConsumer 需要實(shí)現(xiàn) CheckpointListener 接口:

public interface CheckpointListener {
  void onCheckpoint(Map<SystemStreamPartition, String> offsets);
}

對(duì)于實(shí)現(xiàn)此接口的 SystemConsumers,Samza 將每次在 OffsetManager 檢查點(diǎn)調(diào)用 onCheckpoint()回調(diào)。檢查點(diǎn)是根據(jù)任務(wù)完成的,“偏移”是任務(wù)的 Samza 檢查點(diǎn)的所有偏移量,這些是在重新啟動(dòng)時(shí)傳遞給消費(fèi)者的偏移量。請(qǐng)注意,回調(diào)將在檢查點(diǎn)之后發(fā)生,并且不是原子的。

狀態(tài)管理  ?



以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)