spring-boot - 将通量减少到通量
问题描述
我在申请减少操作时遇到了一些问题Stream<Flux<T>>
,我想将其减少到Flux<T>
。每个 AdProvider 都以 Flux 的形式提供报价,我想使用流从每个 AdProvider 获取所有报价并将它们连接到一个管道。我怎么可能用reduce做到这一点?
Set<AdProvider> adProviders;
@Override
@LogBefore
public void gather()
{
adProviders
.parallelStream()
.map(this::gatherOffers)
.reduce(?)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
解决方案
Stream<Flux>
使用Flux#fromStream()
+展平Flux#flatMap()
为了解决这个问题,你可以结合Flux#fromStream()
(转换Stream<Flux>
为Flux<Flux>
)和Flux#flatMap()
(将内部通量变平为 flat Flux
),如下例所示:
Set<AdProvider> adProviders;
@Override
public void gather()
{
Flux.fromStream(adProviders.stream())
.parallel() // replace .parallelStream with separate parallel + runOn
.runOn(Schedulers.parallel())
.flatMap(this::gatherOffers)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
可能会注意到,我parallelStream
用plain.stream
和parallel
+替换,runOn
它们的作用几乎相同。
或者,您可以完全避免使用流并简单地依赖Flux.fromIterble
+ same Flux#flatMap
:
Set<AdProvider> adProviders;
@Override
public void gather()
{
Flux.fromIterable(adProviders)
.parallel() // replace .parallelStream with separate parallel + runOn
.runOn(Schedulers.parallel())
.flatMap(this::gatherOffers)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
推荐阅读
- c++ - cout 中断错误
- string - SPSS以字符串中的特定单词为条件创建新的数字输出变量
- python - Python:如何在 dict 中获取可能的键组合
- c - 读取文件并在读取到数组的每一行的开头添加 10 个数组元素我正在存储文件的行
- google-apps-script - 登录并获取 url Google Apps 脚本
- c# - JSON在c#中获取一些数据
- android - 添加资源后构建不成功,如何在res文件夹中添加资源?
- python - python文本框换行对我不起作用
- c++ - 将指向数组的指针作为函数参数传递,它本身是另一个函数的返回值?
- mysql - 用于从多个表中选择数据的 SQL 查询