java - Mutiny - 如何分组项目以按块发送请求
问题描述
我正在使用 Mutiny 扩展(用于 Quarkus),但我不知道如何解决这个问题。
我想以异步方式发送许多请求,所以我读过 Mutiny 扩展。但是服务器关闭了连接,因为它收到了数千个。
所以我需要:
- 分块发送请求
- 发送所有请求后,执行操作。
我一直在使用 Uni 对象来组合所有响应,如下所示:
Uni<Map<Integer, String>> uniAll = Uni.combine()
.all()
.unis(list)
.combinedWith(...);
接着:
uniAll.subscribe()
.with(...);
此代码并行发送所有请求,以便服务器关闭连接。
我正在使用一组 Multi 对象,但我不知道如何使用它(在 Mutiny 文档中我找不到任何示例)。
这就是我现在正在做的方式:
//Launch 1000 request
for (int i=0;i<1000;i++) {
multi = client.getAbs("https://api.*********.io/jokes/random")
.as(BodyCodec.jsonObject())
.send()
.onItem().transformToMulti(
array -> Multi.createFrom()
.item(array.body().getString("value")))
.group()
.intoLists()
.of(100)
.subscribe()
.with(a->{
System.out.println("Value: "+a);
});
}
我认为订阅不会执行,直到有“100”组项目,但我想这不是方式,因为它不起作用。
有谁知道如何以 100 个块启动数千个异步请求?
提前致谢。
2012 年 4 月 19 日更新
我试过这种方法:
List<Uni<String>> listOfUnis = new ArrayList<>();
for (int i=0;i<1000;i++) {
listOfUnis.add(client
.getAbs("https://api.*******.io/jokes/random")
.as(BodyCodec.jsonObject())
.send()
.onItem()
.transform(item -> item
.body()
.getString("value")));
}
Multi<Uni<String>> multiFormUnis = Multi.createFrom()
.iterable(listOfUnis);
List<String> listOfResponses = new ArrayList<>();
List<String> listOfValues = multiFormUnis.group()
.intoLists()
.of(100)
.onItem()
.transformToMultiAndConcatenate(listOfOneHundred ->
{
System.out.println("Size: "+listOfOneHundred.size());
for (int index=0;index<listOfOneHundred.size();index++) {
listOfResponses.add(listOfOneHundred.get(index)
.await()
.indefinitely());
}
return Multi.createFrom()
.iterable(listOfResponses);
})
.collectItems()
.asList()
.await()
.indefinitely();
for (String value : listOfValues) {
System.out.println(value);
}
当我把这条线:
listOfResponses.add(listOfOneHundred.get(index)
.await()
.indefinitely());
响应一个接一个地打印,当前 100 组项目结束时,它会打印下一组。问题?有顺序请求,需要很多时间
我想我已经接近解决方案了,但我需要知道,如何仅以 100 组为一组发送并行请求,因为如果我输入:
subscribe().with()
所有请求都是并行发送的(而不是 100 组)
解决方案
我认为你创造了很多错误,使用它会更容易:
Multi<String> multiOfJokes = Multi.createFrom().emitter(multiEmitter -> {
for (int i=0;i<1000;i++) {
multiEmitter.emit(i);
}
multiEmitter.complete();
}).onItem().transformToUniAndMerge(index -> {
return Uni.createFrom().item("String" + index);
})
使用这种方法,它应该使调用并行。现在是如何将其列入列表的问题。
分组工作正常,我使用以下代码运行它:
Random random = new Random();
Multi<Integer> multiOfInteger = Multi.createFrom().emitter(multiEmitter -> {
for (Integer i=0;i<1000;i++) {
multiEmitter.emit(i);
}
multiEmitter.complete();
});
Multi<String> multiOfJokes = multiOfInteger.onItem().transformToUniAndMerge(index -> {
if (index % 10 == 0 ) {
Duration delay = Duration.ofMillis(random.nextInt(100) + 1);
return Uni.createFrom().item("String " + index + " delayed").onItem()
.delayIt().by(delay);
}
return Uni.createFrom().item("String" + index);
}).onCompletion().invoke(() -> System.out.println("Completed"));
Multi<List<String>> multiListJokes = multiOfJokes
.group().intoLists().of(100)
.onCompletion().invoke(() -> System.out.println("Completed"))
.onItem().invoke(strings -> System.out.println(strings));
multiListJokes.collect().asList().await().indefinitely();
您将获得您的字符串列表。
我不知道,您打算如何将列表发送到后端。但是您可以使用以下方法之一:
- 调用(异步执行)
- 编写自己的订阅者(实现订阅者)方法很简单。根据您的批量请求的需要。
我希望你以后能更好地理解它。
PS:链接到我学到所有这些的指南: https ://smallrye.io/smallrye-mutiny/guides
推荐阅读
- python - python中树的左右旋转
- azure-active-directory - 团队任务模块 SSO 问题
- c# - 如何使用 XML/XElement 将冒号 : 放入标签的第一部分...?
- google-apps-script - google.script.run.withSuccessHandler 似乎没有返回值
- python - 使用动态加载的模块进行多处理
- visual-studio - 将 Visual Studio 设置为为具有其他扩展名 (.txaml) 的文件打开 XAML 编辑器
- algorithm - Aberth-Ehrlich 方法和精度误差的 Delphi 实现
- javascript - Vue.js 仅在特定数组更改时更新()
- node.js - 在 NodeJS Promise 中排序
- ruby - 如何在 Prawn PDF 中为子表提供填充?