首页 > 解决方案 > Azure 事件中心降低所有分区的出口吞吐量

问题描述

我正在构建一个使用来自 Azure 事件中心的所有分区的事件的应用程序。我正面临摄取率较低的问题。在启动应用程序时,摄取率越来越高,随着时间的推移越来越低。

代码如 spring-boot 片段所示

private Disposable subscription;
private EventHubConsumerAsyncClient client;

client = new EventHubClientBuilder()
                    .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
                    .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
            subscription = client.receive(true).subscribe(new EventProcessor());

事件处理器服务

@Service
public class EventProcessor implements Consumer<PartitionEvent> {
    long start = System.currentTimeMillis();
    int count = 0;
    int total = 0;

    @Override
    public void accept(PartitionEvent event) {

        if (System.currentTimeMillis() - start < 1000) {
            count++;
        } else {
            System.out.println("Events per second ::" + count);
            count = 0;
            start = System.currentTimeMillis();
        }

    }

}

Colsole o/p

2020-04-30 19:50:11.024  INFO 6906 --- [       single-1] c.a.m.e.EventHubConsumerAsyncClient      : connectionId[MF_dcd848_1588256404924] linkName[all_fd160b_1588256404998-2]: Creating receive consumer for partition '2'
2020-04-30 19:50:11.024  INFO 6906 --- [       single-1] c.a.c.a.implementation.ReactorSession    : linkName[all_fd160b_1588256404998-2] entityPath[spring-event-hub/ConsumerGroups/$Default/Partitions/2]: Returning existing receive link.
2020-04-30 19:50:11.024  INFO 6906 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkLocalOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-0], localSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/0', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '-1'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.026  INFO 6906 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkLocalOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-1], localSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '-1'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.026  INFO 6906 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkLocalOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-2], localSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/2', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '-1'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.466  INFO 6906 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkRemoteOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-1], remoteSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@2615c490}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.467  INFO 6906 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkRemoteOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-0], remoteSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/0', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@76700147}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.470  INFO 6906 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkRemoteOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-2], remoteSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/2', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@52d562e3}, defaultOutcome=null, outcomes=null, capabilities=null}]
Events per second ::0
Events per second ::316
Events per second ::818
Events per second ::409
Events per second ::87
Events per second ::21
Events per second ::12
Events per second ::5
Events per second ::3
Events per second ::4
Events per second ::3
Events per second ::4
Events per second ::4
Events per second ::5
Events per second ::5

旁注:我已经通过增加 TU 进行了测试,但没有运气

标签: javaazurespring-bootazure-eventhub

解决方案


推荐阅读