SpringCloud 用Kafka Binder進行分區(qū)

2023-11-28 16:33 更新

Apache Kafka本機支持主題分區(qū)。

有時,將數(shù)據(jù)發(fā)送到特定的分區(qū)是有好處的-例如,當您要嚴格訂購消息處理時(特定客戶的所有消息應(yīng)轉(zhuǎn)到同一分區(qū))。

以下示例顯示了如何配置生產(chǎn)方和消費者方:

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml。 

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partitioned: true
            partition-key-expression: headers['partitionKey']
            partition-count: 12

 必須為該主題提供足夠的分區(qū),以實現(xiàn)所有消費者組所需的并發(fā)性。上面的配置最多支持12個使用者實例(如果concurrency為2,則為6;如果并發(fā)值為3,則為4,依此類推)。通常最好 過量供應(yīng) 分區(qū),以使將來的使用者或并發(fā)性增加。

 前面的配置使用默認分區(qū)(key.hashCode() % partitionCount)。根據(jù)鍵值,這可能會或可能不會提供適當?shù)钠胶馑惴ā?/span>您可以使用partitionSelectorExpressionpartitionSelectorClass屬性覆蓋此默認設(shè)置。

由于分區(qū)是由Kafka本地處理的,因此在使用者端不需要特殊配置。Kafka在實例之間分配分區(qū)。

以下Spring Boot應(yīng)用程序偵聽Kafka流,并打印(到控制臺)每條消息去往的分區(qū)ID:

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}

application.yml。 

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根據(jù)需要添加實例。Kafka重新平衡分區(qū)分配。如果實例計數(shù)(或instance count * concurrency)超過了分區(qū)數(shù),則某些使用者處于空閑狀態(tài)。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號