W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
在本節(jié)中,我們將說明針對特定方案使用前面的屬性。
此示例說明了如何在用戶應(yīng)用程序中手動確認(rèn)偏移。
本示例要求將spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
設(shè)置為false
。在您的示例中使用相應(yīng)的輸入通道名稱。
@SpringBootApplication @EnableBinding(Sink.class) public class ManuallyAcknowdledgingConsumer { public static void main(String[] args) { SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args); } @StreamListener(Sink.INPUT) public void process(Message<?> message) { Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); if (acknowledgment != null) { System.out.println("Acknowledgment provided"); acknowledgment.acknowledge(); } } }
Apache Kafka 0.9支持客戶端和代理之間的安全連接。要利用此功能,請遵循Apache Kafka文檔中的準(zhǔn)則以及Confluent文檔中的Kafka 0.9 安全準(zhǔn)則。使用spring.cloud.stream.kafka.binder.configuration
選項為活頁夾創(chuàng)建的所有客戶端設(shè)置安全性屬性。
例如,要將security.protocol
設(shè)置為SASL_SSL
,請設(shè)置以下屬性:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
可以以類似方式設(shè)置所有其他安全屬性。
使用Kerberos時,請遵循參考文檔中的說明來創(chuàng)建和引用JAAS配置。
Spring Cloud Stream支持通過使用JAAS配置文件并使用Spring Boot屬性將JAAS配置信息傳遞到應(yīng)用程序。
可以使用系統(tǒng)屬性為Spring Cloud Stream應(yīng)用程序設(shè)置JAAS和(可選)krb5文件位置。以下示例顯示如何通過使用JAAS配置文件使用SASL和Kerberos啟動Spring Cloud Stream應(yīng)用程序:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
作為使用JAAS配置文件的替代方法,Spring Cloud Stream提供了一種通過使用Spring Boot屬性為Spring Cloud Stream應(yīng)用程序設(shè)置JAAS配置的機制。
以下屬性可用于配置Kafka客戶端的登錄上下文:
登錄模塊名稱。正常情況下無需設(shè)置。
默認(rèn)值:com.sun.security.auth.module.Krb5LoginModule
。
登錄模塊的控制標(biāo)志。
默認(rèn)值:required
。
使用包含登錄模塊選項的鍵/值對進(jìn)行映射。
默認(rèn)值:空地圖。
以下示例顯示如何使用Spring Boot配置屬性使用SASL和Kerberos啟動Spring Cloud Stream應(yīng)用程序:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \ --spring.cloud.stream.bindings.input.destination=stream.ticktock \ --spring.cloud.stream.kafka.binder.autoCreateTopics=false \ --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \ --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \ --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \ --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \ --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例表示以下JAAS文件的等效項:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1@EXAMPLE.COM"; };
如果所需的主題已經(jīng)存在于代理上或?qū)⒂晒芾韱T創(chuàng)建,則可以關(guān)閉自動創(chuàng)建,僅需要發(fā)送客戶端JAAS屬性。
請勿在同一應(yīng)用程序中混合使用JAAS配置文件和Spring Boot屬性。如果
-Djava.security.auth.login.config
系統(tǒng)屬性已經(jīng)存在,則Spring Cloud Stream將忽略Spring Boot屬性。
將
autoCreateTopics
和autoAddPartitions
與Kerberos一起使用時要小心。通常,應(yīng)用程序可能使用在Kafka和Zookeeper中沒有管理權(quán)限的主體。因此,依靠Spring Cloud Stream創(chuàng)建/修改主題可能會失敗。在安全的環(huán)境中,強烈建議您使用Kafka工具創(chuàng)建主題并以管理方式管理ACL。
如果希望暫停使用但不引起分區(qū)重新平衡,則可以暫停并恢復(fù)使用方。通過將Consumer
作為參數(shù)添加到@StreamListener
中,可以簡化此操作。要恢復(fù),需要為ListenerContainerIdleEvent
實例使用ApplicationListener
。事件的發(fā)布頻率由idleEventInterval
屬性控制。由于使用者不是線程安全的,因此必須在調(diào)用線程上調(diào)用這些方法。
以下簡單的應(yīng)用程序顯示了如何暫停和恢復(fù):
@SpringBootApplication @EnableBinding(Sink.class) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @StreamListener(Sink.INPUT) public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) { System.out.println(in); consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0))); } @Bean public ApplicationListener<ListenerContainerIdleEvent> idleListener() { return event -> { System.out.println(event); if (event.getConsumer().paused().size() > 0) { event.getConsumer().resume(event.getConsumer().paused()); } }; } }
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: