SpringCloud Kafka消費(fèi)者Properties

2023-11-28 16:24 更新

以下屬性僅適用于Kafka使用者,并且必須以spring.cloud.stream.kafka.bindings.<channelName>.consumer.為前綴。

管理員配置

供應(yīng)主題時(shí)使用的Kafka主題屬性中的Map(例如,spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

默認(rèn)值:無(wú)。

管理員副本分配

副本分配的Map <Integer,List <Integer >>,鍵為分區(qū),值為分配。在配置新主題時(shí)使用。請(qǐng)參閱kafka-clients jar中的NewTopic Javadocs。

默認(rèn)值:無(wú)。

管理員復(fù)制因子

設(shè)置主題時(shí)要使用的復(fù)制因子。覆蓋活頁(yè)夾范圍的設(shè)置。忽略是否??存在replicas-assignments。

默認(rèn)值:無(wú)(使用資料夾范圍的默認(rèn)值1)。

autoRebalanceEnabled

當(dāng)為true時(shí),主題分區(qū)將在使用者組的成員之間自動(dòng)重新平衡。當(dāng)false時(shí),將為每個(gè)使用者分配基于spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex的固定分區(qū)集合。這要求在每個(gè)啟動(dòng)的實(shí)例上同時(shí)設(shè)置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex屬性。在這種情況下,spring.cloud.stream.instanceCount屬性的值通常必須大于1。

默認(rèn)值:true。

ackEachRecord

當(dāng)autoCommitOffsettrue時(shí),此設(shè)置指示在處理每條記錄后是否提交偏移量。默認(rèn)情況下,在處理consumer.poll()返回的記錄批次中的所有記錄之后,將提交偏移量。可以使用max.poll.records Kafka屬性控制輪詢返回的記錄數(shù),該屬性是通過(guò)使用者configuration屬性設(shè)置的。將此設(shè)置為true可能會(huì)導(dǎo)致性能下降,但是這樣做會(huì)減少發(fā)生故障時(shí)重新傳送記錄的可能性。另外,請(qǐng)參見(jiàn)活頁(yè)夾requiredAcks屬性,該屬性還影響落實(shí)偏移量的性能。

默認(rèn)值:false。

autoCommitOffset

處理消息后是否自動(dòng)提交偏移量。如果設(shè)置為false,則入站消息中將出現(xiàn)帶有類型為org.springframework.kafka.support.Acknowledgment頭的鍵kafka_acknowledgment的頭。應(yīng)用程序可以使用此標(biāo)頭來(lái)確認(rèn)消息。有關(guān)詳細(xì)信息,請(qǐng)參見(jiàn)示例部分。當(dāng)此屬性設(shè)置為false時(shí),Kafka活頁(yè)夾將ack模式設(shè)置為org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,應(yīng)用程序負(fù)責(zé)確認(rèn)記錄。另請(qǐng)參閱ackEachRecord。

默認(rèn)值:true。

autoCommitOnError

僅在autoCommitOffset設(shè)置為true時(shí)有效。如果設(shè)置為false,它將抑制導(dǎo)致錯(cuò)誤的消息的自動(dòng)提交,僅對(duì)成功的消息進(jìn)行提交。如果持續(xù)出現(xiàn)故障,它允許流從上次成功處理的消息自動(dòng)重播。如果設(shè)置為true,它將始終自動(dòng)提交(如果啟用了自動(dòng)提交)。如果未設(shè)置(默認(rèn)值),則其有效值與enableDlq相同,如果將錯(cuò)誤消息發(fā)送到DLQ,則自動(dòng)提交錯(cuò)誤消息,否則不提交。

默認(rèn)值:未設(shè)置。

resetOffsets

是否將使用者的偏移量重置為startOffset提供的值。

默認(rèn)值:false。

startOffset

新組的起始偏移量。允許的值:earliestlatest。如果為消費(fèi)者“綁定”顯式設(shè)置了消費(fèi)者組(通過(guò)spring.cloud.stream.bindings.<channelName>.group),則“ startOffset”設(shè)置為earliest否則,將anonymous消費(fèi)者組的值設(shè)置為latest另請(qǐng)參見(jiàn)resetOffsets(在此列表的前面)。

默認(rèn)值:null(等于earliest)。

enableDlq

設(shè)置為true時(shí),它將為使用者啟用DLQ行為。默認(rèn)情況下,導(dǎo)致錯(cuò)誤的消息將轉(zhuǎn)發(fā)到名為error.<destination>.<group>的主題。可以通過(guò)設(shè)置dlqName屬性來(lái)配置DLQ主題名稱。對(duì)于錯(cuò)誤數(shù)量相對(duì)較小并且重放整個(gè)原始主題可能太麻煩的情況,這為更常見(jiàn)的Kafka重播方案提供了一個(gè)替代選項(xiàng)。有關(guān)更多信息請(qǐng)參見(jiàn)“ Dead-Letter主題處理”處理。從2.0版開(kāi)始,發(fā)送到DLQ主題的消息將通過(guò)以下標(biāo)頭得到增強(qiáng):x-original-topic,x-exception-messagex-exception-stacktracebyte[]。 當(dāng)destinationIsPatterntrue時(shí)不允許使用。

默認(rèn)值:false。

組態(tài)

使用包含通用Kafka使用者屬性的鍵/值對(duì)進(jìn)行映射。

默認(rèn)值:空地圖。

dlqName

接收錯(cuò)誤消息的DLQ主題的名稱。

默認(rèn)值:null(如果未指定,則導(dǎo)致錯(cuò)誤的消息將轉(zhuǎn)發(fā)到名為error.<destination>.<group>的主題)。

dlqProducerProperties

使用此功能,可以設(shè)置特定于DLQ的生產(chǎn)者屬性。通過(guò)kafka生產(chǎn)者屬性可用的所有屬性都可以通過(guò)此屬性設(shè)置。

默認(rèn):默認(rèn)Kafka生產(chǎn)者屬性。

標(biāo)頭

指示入站通道適配器填充哪些標(biāo)準(zhǔn)頭。允許的值:none,id,timestampboth如果使用本機(jī)反序列化并且第一個(gè)組件接收消息需要id(例如配置為使用JDBC消息存儲(chǔ)的聚合器),則很有用。

默認(rèn)值:none

converterBeanName

實(shí)現(xiàn)RecordMessageConverter的bean的名稱。在入站通道適配器中用于替換默認(rèn)的MessagingMessageConverter。

默認(rèn)值:null

idleEventInterval

事件之間的間隔(以毫秒為單位),指示最近未接收到任何消息。使用ApplicationListener<ListenerContainerIdleEvent>接收這些事件。有關(guān)用法示例,請(qǐng)參見(jiàn)“示例:暫停和恢復(fù)使用者”一節(jié)。

默認(rèn)值:30000

destinationIsPattern

如果為true,則將目的地視為正則表達(dá)式Pattern,用于由代理匹配主題名稱。設(shè)置為true時(shí),不設(shè)置主題,并且不允許enableDlq,因?yàn)榻壎ㄕ咴谠O(shè)置階段不知道主題名稱。請(qǐng)注意,檢測(cè)與模式匹配的新主題所花費(fèi)的時(shí)間由消費(fèi)者屬性metadata.max.age.ms控制,該屬性(在撰寫本文時(shí))默認(rèn)為300,000ms(5分鐘)。可以使用上面的configuration屬性進(jìn)行配置。

默認(rèn)值:false

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)