SpringCloud 使用@StreamListener進(jìn)行基于內(nèi)容的路由

2023-11-25 09:15 更新

Spring Cloud Stream支持根據(jù)條件將消息調(diào)度到用@StreamListener注釋的多個(gè)處理程序方法。

為了有資格支持條件分派,一種方法必須滿足以下條件:

  • 它不能返回值。
  • 它必須是單獨(dú)的消息處理方法(不支持反應(yīng)性API方法)。

該條件由注釋的condition參數(shù)中的SpEL表達(dá)式指定,并針對(duì)每條消息進(jìn)行評(píng)估。所有與條件匹配的處理程序都在同一線程中調(diào)用,并且不必假設(shè)調(diào)用的順序。

在具有分配條件的@StreamListener的以下示例中,所有帶有標(biāo)頭type且具有值bogey的消息都被分配到receiveBogey方法,所有帶有標(biāo)頭{11的消息值bacall的/}發(fā)送到receiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}

condition上下文中的內(nèi)容類型協(xié)商

了解使用@StreamListener參數(shù)condition的基于內(nèi)容的路由背后的一些機(jī)制很重要,尤其是在整個(gè)消息類型的上下文中。如果在繼續(xù)之前熟悉,內(nèi)容類型協(xié)商,這也可能會(huì)有所幫助

請(qǐng)考慮以下情形:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

前面的代碼是完全有效的。它可以毫無問題地進(jìn)行編譯和部署,但是永遠(yuǎn)不會(huì)產(chǎn)生您期望的結(jié)果。

這是因?yàn)槟跍y(cè)試的東西在您期望的狀態(tài)下尚不存在。這是因?yàn)橄⒌挠行ж?fù)載尚未從有線格式(byte[])轉(zhuǎn)換為所需的類型。換句話說,它尚未經(jīng)過,內(nèi)容類型協(xié)商中描述的類型轉(zhuǎn)換過程。

因此,除非使用SPeL表達(dá)式評(píng)估原始數(shù)據(jù)(例如,字節(jié)數(shù)組中第一個(gè)字節(jié)的值),否則請(qǐng)使用基于消息標(biāo)頭的表達(dá)式(例如condition = "headers['type']=='dog'")。

 目前,僅基于通道的綁定程序(不支持響應(yīng)編程)支持通過@StreamListener條件進(jìn)行分派。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)