apache-flink - Flink 1.6 Async IO - 如何在丰富流时增加吞吐量,使用 REST 服务调用?
问题描述
我目前使用的是 Flink 1.6 版,并且面临 AsyncIO 的问题,其中性能达不到我的预期。
我确信我在执行过程中做错了什么,所以任何建议/建议都将不胜感激。
问题概要 - 我正在使用一个 id 流。对于每个 id,我需要调用一个 REST 服务。我已经实现了一个 RichAsyncFunction,它执行异步 REST 调用。
下面是相关的代码方法和asyncInvoke方法
// these are initialized in the open method
ExecutorService executorService =
ExecutorService.newFixedThreadPool(n);
CloseableHttpAsyncClient client = ...
Gson gson = ...
public void asyncInvoke(String key, final ResultFuture<Item> resultFuture) throws Exception {
executorService.submit(new Runnable() {
client.execute(new HttpGet(new URI("http://myservice/" + key)), new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse response) {
System.out.println("completed successfully");
Item item = gson.fromJson(EntityUtils.toString(response.getEntity), Item.class);
resultFuture.complete(Collections.singleton(item));
}
});
});
}
通过上述实现,我尝试过:-
- 增加浓缩操作的并行度
- 增加执行器服务中的线程数
- 使用 apache http 异步客户端,我尝试调整连接管理器设置 - setDefaultMaxPerRoute 和 setMaxTotal。
我始终获得大约 100 个请求/秒的吞吐量。该服务每秒能够处理超过 5k。我做错了什么,我该如何改进?
解决方案
推荐阅读
- laravel - Eloquent Query:所有具有相同类别的新闻。数据透视表
- node.js - 通过 npm 安装程序使用 nodejs 安装 React.js 代码后未运行
- javascript - 计算 Kintone 中的单选按钮值
- swift - 在 SwiftUI 中动态隐藏视图
- laravel-5 - strpos() 期望参数 1 是字符串,给定数组 - 当我进行雄辩的查询时
- vhdl - 我对 vhdl 中的当前时间有一些疑问
- c++ - 如何计算 C++ std::map 中不同值的数量
- google-cloud-functions - 使用 Node.js 从 Cloud Functions 读取 Cloud Bigtable 需要 > 1500 毫秒
- android - 通过通知操作安排 WorkManager 工作人员
- azure - 天蓝色负载均衡器可以启动/停止虚拟机吗?