首页 > 解决方案 > 如何在 Spring Cloud Stream 中使用 2 个 Azure 事件中心

问题描述

我想使用来自以下 2 个连接字符串的消息

Endpoint=sb://region1.servicebus.windows.net/;SharedAccessKeyName=abc;SharedAccessKey=123;EntityPath=my-request Endpoint=sb://region2.servicebus.windows.net/;SharedAccessKeyName=def;SharedAccessKey=456 ;EntityPath=我的请求

使用 Java API 非常简单

EventHubConsumerAsyncClient client = new EventHubClientBuilder()
                        .connectionString("Endpoint=sb://region1.servicebus.windows.net/;SharedAccessKeyName=abc;SharedAccessKey=123;EntityPath=my-request")
                        .buildAsyncConsumerClient();

但是,如何使用 Spring Cloud Stream(相当于上面的 Java 代码)在 yaml 文件中进行这项工作?尝试了网上找到的所有教程,但没有一个有效。

spring:
      cloud:
        stream:
          function:
            definition: consumeRegion1;consumeRegion2
          bindings:
            consumeRegion1-in-0:
              destination: my-request
              binder: eventhub1
            consumeRegion2-in-0:
              destination: my-request
              binder: eventhub2
          binders:
            eventhub1:
              type: eventhub
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhub:
                        connection-string: Endpoint=sb://region1.servicebus.windows.net/;SharedAccessKeyName=abc;SharedAccessKey=123;EntityPath=my-request
            eventhub2:
              type: eventhub
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhub:
                        connection-string: Endpoint=sb://region2.servicebus.windows.net/;SharedAccessKeyName=def;SharedAccessKey=456;EntityPath=my-request
    @Bean
    public Consumer<Message<String>> consumeRegion1() {
        return message -> {
            System.out.printf(message.getPayload());
        };
    }

    @Bean
    public Consumer<Message<String>> consumeRegion2() {
        return message -> {
            System.out.printf(message.getPayload());
        };
    }
        <dependency>
            <groupId>com.azure.spring</groupId>
            <artifactId>azure-spring-cloud-stream-binder-eventhubs</artifactId>
            <version>2.5.0</version>
        </dependency>

错误日志

2021-10-14 21:12:26.760  INFO 1 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-14 21:12:26.882  INFO 1 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-14 21:12:26.884  INFO 1 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-14 21:12:29.587  WARN 1 --- [           main] a.s.c.a.e.AzureEventHubAutoConfiguration : Can't construct the EventHubConnectionStringProvider, namespace: null, connectionString: null
2021-10-14 21:12:29.611  INFO 1 --- [           main] a.s.c.a.e.AzureEventHubAutoConfiguration : No event hub connection string provided.
2021-10-14 21:12:30.290  INFO 1 --- [           main] c.a.s.i.eventhub.impl.EventHubTemplate   : Started EventHubTemplate with properties: {checkpointConfig=CheckpointConfig{checkpointMode=RECORD, checkpointCount=0, checkpointInterval=null}, startPosition=LATEST}
2021-10-14 21:12:32.934  INFO 1 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.

标签: azurespring-cloud-streamazure-eventhub

解决方案


推荐阅读