SpringCloud Kafka消費者Properties

2023-11-28 16:24 更新

以下屬性僅適用于Kafka使用者,并且必須以spring.cloud.stream.kafka.bindings.<channelName>.consumer.為前綴。

管理員配置

供應(yīng)主題時使用的Kafka主題屬性中的Map(例如,spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

默認值:無。

管理員副本分配

副本分配的Map <Integer,List <Integer >>,鍵為分區(qū),值為分配。在配置新主題時使用。請參閱kafka-clients jar中的NewTopic Javadocs。

默認值:無。

管理員復制因子

設(shè)置主題時要使用的復制因子。覆蓋活頁夾范圍的設(shè)置。忽略是否??存在replicas-assignments

默認值:無(使用資料夾范圍的默認值1)。

autoRebalanceEnabled

當為true時,主題分區(qū)將在使用者組的成員之間自動重新平衡。false時,將為每個使用者分配基于spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex的固定分區(qū)集合。這要求在每個啟動的實例上同時設(shè)置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex屬性。在這種情況下,spring.cloud.stream.instanceCount屬性的值通常必須大于1。

默認值:true。

ackEachRecord

autoCommitOffsettrue時,此設(shè)置指示在處理每條記錄后是否提交偏移量。默認情況下,在處理consumer.poll()返回的記錄批次中的所有記錄之后,將提交偏移量。可以使用max.poll.records Kafka屬性控制輪詢返回的記錄數(shù),該屬性是通過使用者configuration屬性設(shè)置的。將此設(shè)置為true可能會導致性能下降,但是這樣做會減少發(fā)生故障時重新傳送記錄的可能性。另外,請參見活頁夾requiredAcks屬性,該屬性還影響落實偏移量的性能。

默認值:false

autoCommitOffset

處理消息后是否自動提交偏移量。如果設(shè)置為false,則入站消息中將出現(xiàn)帶有類型為org.springframework.kafka.support.Acknowledgment頭的鍵kafka_acknowledgment的頭。應(yīng)用程序可以使用此標頭來確認消息。有關(guān)詳細信息,請參見示例部分。當此屬性設(shè)置為false時,Kafka活頁夾將ack模式設(shè)置為org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,應(yīng)用程序負責確認記錄。另請參閱ackEachRecord。

默認值:true。

autoCommitOnError

僅在autoCommitOffset設(shè)置為true時有效。如果設(shè)置為false,它將抑制導致錯誤的消息的自動提交,僅對成功的消息進行提交。如果持續(xù)出現(xiàn)故障,它允許流從上次成功處理的消息自動重播。如果設(shè)置為true,它將始終自動提交(如果啟用了自動提交)。如果未設(shè)置(默認值),則其有效值與enableDlq相同,如果將錯誤消息發(fā)送到DLQ,則自動提交錯誤消息,否則不提交。

默認值:未設(shè)置。

resetOffsets

是否將使用者的偏移量重置為startOffset提供的值。

默認值:false。

startOffset

新組的起始偏移量。允許的值:earliestlatest。如果為消費者“綁定”顯式設(shè)置了消費者組(通過spring.cloud.stream.bindings.<channelName>.group),則“ startOffset”設(shè)置為earliest。否則,將anonymous消費者組的值設(shè)置為latest。另請參見resetOffsets(在此列表的前面)。

默認值:null(等于earliest)。

enableDlq

設(shè)置為true時,它將為使用者啟用DLQ行為。默認情況下,導致錯誤的消息將轉(zhuǎn)發(fā)到名為error.<destination>.<group>的主題。可以通過設(shè)置dlqName屬性來配置DLQ主題名稱。對于錯誤數(shù)量相對較小并且重放整個原始主題可能太麻煩的情況,這為更常見的Kafka重播方案提供了一個替代選項。有關(guān)更多信息,請參見“ Dead-Letter主題處理”處理。從2.0版開始,發(fā)送到DLQ主題的消息將通過以下標頭得到增強:x-original-topicx-exception-messagex-exception-stacktracebyte[]。 destinationIsPatterntrue時不允許使用。

默認值:false

組態(tài)

使用包含通用Kafka使用者屬性的鍵/值對進行映射。

默認值:空地圖。

dlqName

接收錯誤消息的DLQ主題的名稱。

默認值:null(如果未指定,則導致錯誤的消息將轉(zhuǎn)發(fā)到名為error.<destination>.<group>的主題)。

dlqProducerProperties

使用此功能,可以設(shè)置特定于DLQ的生產(chǎn)者屬性。通過kafka生產(chǎn)者屬性可用的所有屬性都可以通過此屬性設(shè)置。

默認:默認Kafka生產(chǎn)者屬性。

標頭

指示入站通道適配器填充哪些標準頭。允許的值:none,id,timestampboth如果使用本機反序列化并且第一個組件接收消息需要id(例如配置為使用JDBC消息存儲的聚合器),則很有用。

默認值:none

converterBeanName

實現(xiàn)RecordMessageConverter的bean的名稱。在入站通道適配器中用于替換默認的MessagingMessageConverter。

默認值:null

idleEventInterval

事件之間的間隔(以毫秒為單位),指示最近未接收到任何消息。使用ApplicationListener<ListenerContainerIdleEvent>接收這些事件。有關(guān)用法示例,請參見“示例:暫停和恢復使用者”一節(jié)。

默認值:30000

destinationIsPattern

如果為true,則將目的地視為正則表達式Pattern,用于由代理匹配主題名稱。設(shè)置為true時,不設(shè)置主題,并且不允許enableDlq,因為綁定者在設(shè)置階段不知道主題名稱。請注意,檢測與模式匹配的新主題所花費的時間由消費者屬性metadata.max.age.ms控制,該屬性(在撰寫本文時)默認為300,000ms(5分鐘)。可以使用上面的configuration屬性進行配置。

默認值:false

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號