SpringCloud 使用輪詢的使用者

2023-11-26 16:09 更新

總覽

使用輪詢的使用者時(shí),您可以按需輪詢PollableMessageSource考慮以下受調(diào)查消費(fèi)者的示例:

public interface PolledConsumer {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

給定上一個(gè)示例中的受調(diào)查消費(fèi)者,您可以按以下方式使用它:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

PollableMessageSource.poll()方法采用一個(gè)MessageHandler參數(shù)(通常為lambda表達(dá)式,如此處所示)。如果收到并成功處理了消息,它將返回true。

與消息驅(qū)動(dòng)的使用者一樣,如果MessageHandler引發(fā)異常,消息將發(fā)布到錯(cuò)誤通道,如???”中所述。。

通常,poll()方法會(huì)在MessageHandler退出時(shí)確認(rèn)該消息。如果該方法異常退出,則該消息將被拒絕(不重新排隊(duì)),但請(qǐng)參閱“處理錯(cuò)誤”一節(jié)。您可以通過對(duì)確認(rèn)負(fù)責(zé)來(lái)覆蓋該行為,如以下示例所示:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
您必須在某一時(shí)刻ack(或nack)消息,以避免資源泄漏。
某些消息傳遞系統(tǒng)(例如Apache Kafka)在日志中維護(hù)簡(jiǎn)單的偏移量。如果傳遞失敗,并用StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);重新排隊(duì),則重新傳遞任何以后成功確認(rèn)的消息。

還有一個(gè)重載的poll方法,其定義如下:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type是一個(gè)轉(zhuǎn)換提示,它允許轉(zhuǎn)換傳入的消息有效負(fù)載,如以下示例所示:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

處理錯(cuò)誤

默認(rèn)情況下,為可輪詢?cè)磁渲昧艘粋€(gè)錯(cuò)誤通道。如果回調(diào)引發(fā)異常,則將ErrorMessage發(fā)送到錯(cuò)誤通道(<destination>.<group>.errors);此錯(cuò)誤通道也橋接到全局Spring Integration errorChannel。

您可以使用@ServiceActivator訂閱任何一個(gè)錯(cuò)誤通道來(lái)處理錯(cuò)誤。如果沒有訂閱,則將僅記錄錯(cuò)誤并確認(rèn)消息成功。如果錯(cuò)誤通道服務(wù)激活器引發(fā)異常,則該消息將被拒絕(默認(rèn)情況下),并且不會(huì)重新發(fā)送。如果服務(wù)激活器拋出RequeueCurrentMessageException,則該消息將在代理處重新排隊(duì),并在隨后的輪詢中再次檢索。

如果偵聽器直接拋出RequeueCurrentMessageException,則如上所述,該消息將重新排隊(duì),并且不會(huì)發(fā)送到錯(cuò)誤通道。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)