java - Azure 事件中心降低所有分区的出口吞吐量
问题描述
我正在构建一个使用来自 Azure 事件中心的所有分区的事件的应用程序。我正面临摄取率较低的问题。在启动应用程序时,摄取率越来越高,随着时间的推移越来越低。
代码如 spring-boot 片段所示
private Disposable subscription;
private EventHubConsumerAsyncClient client;
client = new EventHubClientBuilder()
.connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
.consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
subscription = client.receive(true).subscribe(new EventProcessor());
事件处理器服务
@Service
public class EventProcessor implements Consumer<PartitionEvent> {
long start = System.currentTimeMillis();
int count = 0;
int total = 0;
@Override
public void accept(PartitionEvent event) {
if (System.currentTimeMillis() - start < 1000) {
count++;
} else {
System.out.println("Events per second ::" + count);
count = 0;
start = System.currentTimeMillis();
}
}
}
Colsole o/p
2020-04-30 19:50:11.024 INFO 6906 --- [ single-1] c.a.m.e.EventHubConsumerAsyncClient : connectionId[MF_dcd848_1588256404924] linkName[all_fd160b_1588256404998-2]: Creating receive consumer for partition '2'
2020-04-30 19:50:11.024 INFO 6906 --- [ single-1] c.a.c.a.implementation.ReactorSession : linkName[all_fd160b_1588256404998-2] entityPath[spring-event-hub/ConsumerGroups/$Default/Partitions/2]: Returning existing receive link.
2020-04-30 19:50:11.024 INFO 6906 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkLocalOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-0], localSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/0', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '-1'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.026 INFO 6906 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkLocalOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-1], localSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '-1'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.026 INFO 6906 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkLocalOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-2], localSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/2', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '-1'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.466 INFO 6906 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkRemoteOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-1], remoteSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@2615c490}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.467 INFO 6906 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkRemoteOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-0], remoteSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/0', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@76700147}, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-30 19:50:11.470 INFO 6906 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkRemoteOpen connectionId[MF_dcd848_1588256404924], linkName[all_fd160b_1588256404998-2], remoteSource[Source{address='spring-event-hub/ConsumerGroups/$Default/Partitions/2', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@52d562e3}, defaultOutcome=null, outcomes=null, capabilities=null}]
Events per second ::0
Events per second ::316
Events per second ::818
Events per second ::409
Events per second ::87
Events per second ::21
Events per second ::12
Events per second ::5
Events per second ::3
Events per second ::4
Events per second ::3
Events per second ::4
Events per second ::4
Events per second ::5
Events per second ::5
旁注:我已经通过增加 TU 进行了测试,但没有运气
解决方案
推荐阅读
- python - 如何在 Snakemake 表格配置中使用列表,用于描述生物信息学管道的测序单元
- php - Setasign SetaPDF SetaPDF_Core_Text_Block / canvas->text() 将文本倒置显示
- php - 如果执行数据超过一定范围查询不被执行
- jenkins - yopMail 在 Jenkins 上运行时要求验证码
- sql - SQL:使用 INFORMATION_SCHEMA.TABLES 时如何指定数据库名称和服务器名称?
- php - 如何在codeigniter中检查复选框值?
- jquery - jquery的提交方法没有提交输入类型提交表单数据
- webhooks - 如何验证 webhook 来自 basecamp
- javascript - 如何获取视口高度,包括开发工具
- python - 将列表的字符串转换为列表