首页 > 解决方案 > Flink 1.6 Async IO - 如何在丰富流时增加吞吐量,使用 REST 服务调用?

问题描述

我目前使用的是 Flink 1.6 版,并且面临 AsyncIO 的问题,其中性能达不到我的预期。

我确信我在执行过程中做错了什么,所以任何建议/建议都将不胜感激。

问题概要 - 我正在使用一个 id 流。对于每个 id,我需要调用一个 REST 服务。我已经实现了一个 RichAsyncFunction,它执行异步 REST 调用。

下面是相关的代码方法和asyncInvoke方法

// these are initialized in the open method
ExecutorService executorService = 
ExecutorService.newFixedThreadPool(n);
CloseableHttpAsyncClient client = ...
Gson gson = ...

public void asyncInvoke(String key, final ResultFuture<Item> resultFuture) throws Exception {

    executorService.submit(new Runnable() {

        client.execute(new HttpGet(new URI("http://myservice/" + key)), new FutureCallback<HttpResponse>() {
             @Override
                public void completed(final HttpResponse response) {
                    System.out.println("completed successfully");
                    Item item = gson.fromJson(EntityUtils.toString(response.getEntity), Item.class);
                    resultFuture.complete(Collections.singleton(item));
                }
        });
    });

}

通过上述实现,我尝试过:-

我始终获得大约 100 个请求/秒的吞吐量。该服务每秒能够处理超过 5k。我做错了什么,我该如何改进?

标签: apache-flink

解决方案


推荐阅读