Spring Cloud 入站通道適配器

2024-01-09 17:59 更新

PubSubInboundChannelAdapter是GCP發(fā)布/訂閱的入站通道適配器,它偵聽GCP發(fā)布/訂閱的新消息。它將新消息轉(zhuǎn)換為內(nèi)部Spring Message,然后將其發(fā)送到綁定的輸出通道。

Google Pub / Sub將消息有效負(fù)載視為字節(jié)數(shù)組。因此,默認(rèn)情況下,入站通道適配器將使用byte[]作為有效載荷來構(gòu)造Spring Message。但是,可以通過設(shè)置PubSubInboundChannelAdapterpayloadType屬性來更改所需的有效負(fù)載類型。 PubSubInboundChannelAdapter將對所需有效負(fù)載類型的轉(zhuǎn)換委派給在PubSubTemplate中配置的PubSubMessageConverter

要使用入站通道適配器,必須在用戶應(yīng)用程序端提供PubSubInboundChannelAdapter并對其進(jìn)行配置。

@Bean
public MessageChannel pubsubInputChannel() {
    return new PublishSubscribeChannel();
}

@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
    @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
    SubscriberFactory subscriberFactory) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(subscriberFactory, "subscriptionName");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
}

在示例中,我們首先指定適配器將向其寫入傳入消息的MessageChannel。MessageChannel的實現(xiàn)在這里并不重要。根據(jù)您的用例,您可能需要使用MessageChannel而非PublishSubscribeChannel。

然后,我們聲明PubSubInboundChannelAdapter bean。它需要我們剛創(chuàng)建的通道和一個SubscriberFactory,該SubscriberFactory從Google Cloud Java Client for Pub / Sub創(chuàng)建Subscriber對象。 GCP Pub / Sub的Spring Boot入門程序提供了已配置的SubscriberFactory。

PubSubInboundChannelAdapter支持三種確認(rèn)模式,其中AckMode.AUTO是默認(rèn)值。

自動確認(rèn)(AckMode.AUTO

如果適配器將消息發(fā)送到通道,并且未引發(fā)任何異常,則消息將被GCP發(fā)布/訂閱確認(rèn)。如果在處理郵件時拋出RuntimeException,則該郵件將被否定。

自動確認(rèn)確認(rèn)(AckMode.AUTO_ACK

如果適配器將消息發(fā)送到通道,并且未引發(fā)任何異常,則消息將被GCP發(fā)布/訂閱確認(rèn)。如果在處理消息時拋出RuntimeException,則消息既不會被確認(rèn)也不會被拒絕。

當(dāng)使用訂閱的確認(rèn)截止時間超時作為重試傳遞回退機制時,此功能很有用。

手動確認(rèn)(AckMode.MANUAL

適配器將BasicAcknowledgeablePubsubMessage對象附加到Message標(biāo)頭。用戶可以使用GcpPubSubHeaders.ORIGINAL_MESSAGE鍵提取BasicAcknowledgeablePubsubMessage,并將其用于(n)確認(rèn)消息。

@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
    return message -> {
        LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
        BasicAcknowledgeablePubsubMessage originalMessage =
              message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
        originalMessage.ack();
    };
}
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號