W3Cschool
恭喜您成為首批注冊(cè)用戶(hù)
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
RabbitMQ不支持本地分區(qū)。
有時(shí),將數(shù)據(jù)發(fā)送到特定分區(qū)是有利的-例如,當(dāng)您要嚴(yán)格訂購(gòu)消息處理時(shí),特定客戶(hù)的所有消息都應(yīng)轉(zhuǎn)到同一分區(qū)。
RabbitMessageChannelBinder
通過(guò)將每個(gè)分區(qū)的隊(duì)列綁定到目標(biāo)交換機(jī)來(lái)提供分區(qū)。
以下Java和YAML示例顯示了如何配置生產(chǎn)者:
制片人。
@SpringBootApplication @EnableBinding(Source.class) public class RabbitPartitionProducerApplication { private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final String[] data = new String[] { "abc1", "def1", "qux1", "abc2", "def2", "qux2", "abc3", "def3", "qux3", "abc4", "def4", "qux4", }; public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionProducerApplication.class) .web(false) .run(args); } @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000")) public Message<?> generate() { String value = data[RANDOM.nextInt(data.length)]; System.out.println("Sending: " + value); return MessageBuilder.withPayload(value) .setHeader("partitionKey", value) .build(); } }
application.yml。
spring: cloud: stream: bindings: output: destination: partitioned.destination producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 2 required-groups: - myGroup
前例中的配置使用默認(rèn)分區(qū)(
key.hashCode() % partitionCount
)。根據(jù)鍵值,這可能會(huì)或可能不會(huì)提供適當(dāng)?shù)钠胶馑惴ā?/font>您可以使用partitionSelectorExpression
或partitionSelectorClass
屬性覆蓋此默認(rèn)設(shè)置。僅當(dāng)在部署生產(chǎn)者時(shí)需要提供消費(fèi)者隊(duì)列時(shí),才需要
required-groups
屬性。否則,發(fā)送到分區(qū)的所有消息都將丟失,直到部署了相應(yīng)的使用者為止。
以下配置提供了主題交換:
該交換綁定了以下隊(duì)列:
以下綁定將隊(duì)列與交換關(guān)聯(lián):
以下Java和YAML示例繼續(xù)了前面的示例,并顯示了如何配置使用者:
消費(fèi)者。
@SpringBootApplication @EnableBinding(Sink.class) public class RabbitPartitionConsumerApplication { public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class) .web(false) .run(args); } @StreamListener(Sink.INPUT) public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { System.out.println(in + " received from queue " + queue); } }
application.yml。
spring: cloud: stream: bindings: input: destination: partitioned.destination group: myGroup consumer: partitioned: true instance-index: 0
RabbitMessageChannelBinder
不支持動(dòng)態(tài)縮放。每個(gè)分區(qū)至少必須有一個(gè)使用方。使用者的instanceIndex
用于指示使用了哪個(gè)分區(qū)。諸如Cloud Foundry之類(lèi)的平臺(tái)只能具有一個(gè)帶有instanceIndex
的實(shí)例。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話(huà):173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: