SpringCloud 使用@StreamListener進行基于內容的路由

2023-11-25 09:15 更新

Spring Cloud Stream支持根據(jù)條件將消息調度到用@StreamListener注釋的多個處理程序方法。

為了有資格支持條件分派,一種方法必須滿足以下條件:

  • 它不能返回值。
  • 它必須是單獨的消息處理方法(不支持反應性API方法)。

該條件由注釋的condition參數(shù)中的SpEL表達式指定,并針對每條消息進行評估。所有與條件匹配的處理程序都在同一線程中調用,并且不必假設調用的順序。

在具有分配條件的@StreamListener的以下示例中,所有帶有標頭type且具有值bogey的消息都被分配到receiveBogey方法,所有帶有標頭{11的消息值bacall的/}發(fā)送到receiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}

condition上下文中的內容類型協(xié)商

了解使用@StreamListener參數(shù)condition的基于內容的路由背后的一些機制很重要,尤其是在整個消息類型的上下文中。如果在繼續(xù)之前熟悉,內容類型協(xié)商,這也可能會有所幫助

請考慮以下情形:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

前面的代碼是完全有效的。它可以毫無問題地進行編譯和部署,但是永遠不會產生您期望的結果。

這是因為您正在測試的東西在您期望的狀態(tài)下尚不存在。這是因為消息的有效負載尚未從有線格式(byte[])轉換為所需的類型。換句話說,它尚未經過,內容類型協(xié)商中描述的類型轉換過程。

因此,除非使用SPeL表達式評估原始數(shù)據(jù)(例如,字節(jié)數(shù)組中第一個字節(jié)的值),否則請使用基于消息標頭的表達式(例如condition = "headers['type']=='dog'")。

 目前,僅基于通道的綁定程序(不支持響應編程)支持通過@StreamListener條件進行分派。


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號