SpringCloud 用法示例

2023-11-28 16:27 更新

在本節(jié)中,我們將說明針對特定方案使用前面的屬性。

示例:將autoCommitOffset設(shè)置為false并依靠手動進(jìn)行

此示例說明了如何在用戶應(yīng)用程序中手動確認(rèn)偏移。

本示例要求將spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset設(shè)置為false在您的示例中使用相應(yīng)的輸入通道名稱。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:安全配置

Apache Kafka 0.9支持客戶端和代理之間的安全連接。要利用此功能,請遵循Apache Kafka文檔中的準(zhǔn)則以及Confluent文檔中的Kafka 0.9 安全準(zhǔn)則使用spring.cloud.stream.kafka.binder.configuration選項為活頁夾創(chuàng)建的所有客戶端設(shè)置安全性屬性。

例如,要將security.protocol設(shè)置為SASL_SSL,請設(shè)置以下屬性:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

可以以類似方式設(shè)置所有其他安全屬性。

使用Kerberos時,請遵循參考文檔中的說明來創(chuàng)建和引用JAAS配置。

Spring Cloud Stream支持通過使用JAAS配置文件并使用Spring Boot屬性將JAAS配置信息傳遞到應(yīng)用程序。

使用JAAS配置文件

可以使用系統(tǒng)屬性為Spring Cloud Stream應(yīng)用程序設(shè)置JAAS和(可選)krb5文件位置。以下示例顯示如何通過使用JAAS配置文件使用SASL和Kerberos啟動Spring Cloud Stream應(yīng)用程序:

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用Spring Boot Properties

作為使用JAAS配置文件的替代方法,Spring Cloud Stream提供了一種通過使用Spring Boot屬性為Spring Cloud Stream應(yīng)用程序設(shè)置JAAS配置的機制。

以下屬性可用于配置Kafka客戶端的登錄上下文:

spring.cloud.stream.kafka.binder.jaas.loginModule

登錄模塊名稱。正常情況下無需設(shè)置。

默認(rèn)值:com.sun.security.auth.module.Krb5LoginModule

spring.cloud.stream.kafka.binder.jaas.controlFlag

登錄模塊的控制標(biāo)志。

默認(rèn)值:required

spring.cloud.stream.kafka.binder.jaas.options

使用包含登錄模塊選項的鍵/值對進(jìn)行映射。

默認(rèn)值:空地圖。

以下示例顯示如何使用Spring Boot配置屬性使用SASL和Kerberos啟動Spring Cloud Stream應(yīng)用程序:

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下JAAS文件的等效項:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="kafka-client-1@EXAMPLE.COM";
};

如果所需的主題已經(jīng)存在于代理上或?qū)⒂晒芾韱T創(chuàng)建,則可以關(guān)閉自動創(chuàng)建,僅需要發(fā)送客戶端JAAS屬性。

 請勿在同一應(yīng)用程序中混合使用JAAS配置文件和Spring Boot屬性。如果-Djava.security.auth.login.config系統(tǒng)屬性已經(jīng)存在,則Spring Cloud Stream將忽略Spring Boot屬性。

 autoCreateTopicsautoAddPartitions與Kerberos一起使用時要小心。通常,應(yīng)用程序可能使用在Kafka和Zookeeper中沒有管理權(quán)限的主體。因此,依靠Spring Cloud Stream創(chuàng)建/修改主題可能會失敗。在安全的環(huán)境中,強烈建議您使用Kafka工具創(chuàng)建主題并以管理方式管理ACL。

示例:暫停和恢復(fù)使用方

如果希望暫停使用但不引起分區(qū)重新平衡,則可以暫停并恢復(fù)使用方。通過將Consumer作為參數(shù)添加到@StreamListener中,可以簡化此操作。要恢復(fù),需要為ListenerContainerIdleEvent實例使用ApplicationListener。事件的發(fā)布頻率由idleEventInterval屬性控制。由于使用者不是線程安全的,因此必須在調(diào)用線程上調(diào)用這些方法。

以下簡單的應(yīng)用程序顯示了如何暫停和恢復(fù):

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號