SpringCloud 用RabbitMQ Binder分區(qū)

2023-11-29 16:14 更新

RabbitMQ不支持本地分區(qū)。

有時(shí),將數(shù)據(jù)發(fā)送到特定分區(qū)是有利的-例如,當(dāng)您要嚴(yán)格訂購(gòu)消息處理時(shí),特定客戶(hù)的所有消息都應(yīng)轉(zhuǎn)到同一分區(qū)。

RabbitMessageChannelBinder通過(guò)將每個(gè)分區(qū)的隊(duì)列綁定到目標(biāo)交換機(jī)來(lái)提供分區(qū)。

以下Java和YAML示例顯示了如何配置生產(chǎn)者:

制片人。 

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml。 

    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前例中的配置使用默認(rèn)分區(qū)(key.hashCode() % partitionCount)。根據(jù)鍵值,這可能會(huì)或可能不會(huì)提供適當(dāng)?shù)钠胶馑惴ā?/font>您可以使用partitionSelectorExpressionpartitionSelectorClass屬性覆蓋此默認(rèn)設(shè)置。

僅當(dāng)在部署生產(chǎn)者時(shí)需要提供消費(fèi)者隊(duì)列時(shí),才需要required-groups屬性。否則,發(fā)送到分區(qū)的所有消息都將丟失,直到部署了相應(yīng)的使用者為止。

以下配置提供了主題交換:

零件交換

該交換綁定了以下隊(duì)列:

部分隊(duì)列

以下綁定將隊(duì)列與交換關(guān)聯(lián):

零件綁定

以下Java和YAML示例繼續(xù)了前面的示例,并顯示了如何配置使用者:

消費(fèi)者。 

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}

application.yml。 

    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

RabbitMessageChannelBinder不支持動(dòng)態(tài)縮放。每個(gè)分區(qū)至少必須有一個(gè)使用方。使用者的instanceIndex用于指示使用了哪個(gè)分區(qū)。諸如Cloud Foundry之類(lèi)的平臺(tái)只能具有一個(gè)帶有instanceIndex的實(shí)例。

以上內(nèi)容是否對(duì)您有幫助:
在線(xiàn)筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)