首页 > 解决方案 > 将通量减少到通量

问题描述

我在申请减少操作时遇到了一些问题Stream<Flux<T>>,我想将其减少到Flux<T>。每个 AdProvider 都以 Flux 的形式提供报价,我想使用流从每个 AdProvider 获取所有报价并将它们连接到一个管道。我怎么可能用reduce做到这一点?

Set<AdProvider> adProviders;

@Override
@LogBefore
public void gather()
{
    adProviders
        .parallelStream()
        .map(this::gatherOffers)
        .reduce(?)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

标签: spring-bootstreamreactive-programmingproject-reactor

解决方案


Stream<Flux>使用Flux#fromStream()+展平Flux#flatMap()

为了解决这个问题,你可以结合Flux#fromStream()(转换Stream<Flux>Flux<Flux>)和Flux#flatMap()(将内部通量变平为 flat Flux),如下例所示:

Set<AdProvider> adProviders;

@Override
public void gather()
{
    Flux.fromStream(adProviders.stream())
        .parallel() // replace .parallelStream with separate parallel + runOn
        .runOn(Schedulers.parallel())
        .flatMap(this::gatherOffers)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

可能会注意到,我parallelStream用plain.streamparallel+替换,runOn它们的作用几乎相同。

或者,您可以完全避免使用流并简单地依赖Flux.fromIterble+ same Flux#flatMap

Set<AdProvider> adProviders;

@Override
public void gather()
{
    Flux.fromIterable(adProviders)
        .parallel() // replace .parallelStream with separate parallel + runOn
        .runOn(Schedulers.parallel())
        .flatMap(this::gatherOffers)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

推荐阅读