首页 > 解决方案 > 尝试总结分组值时减少挂起

问题描述

我正在尝试使用一个 Project Reactor 链来收集和分组值,最终按组总结它们。该集合分为两部分和阻塞。

在一个简化的示例中,我能够重现该问题。首先,我在 createWrappers() 中收集一些通用数据,它从网络中读取数据(阻塞调用)。检索数据时,会发出对象。在第二步中,从不同的阻塞网络位置收集详细信息,并将该信息添加到包装器部分。然后将数据转换为详细信息列表,按详细信息键分组,最后按详细信息键汇总。最后应该生成一个如下所示的地图(值特定于测试用例):

key         value
------------------
detail-0    1000
detail-1    2000
detail-2    3000
...

一旦我将 block() 添加到 reduce() 部分,所有内容都挂在下面的示例代码中:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TestBlockingIssue
{
    @Test
    public void testBlockingMap()
    {
        final Flux<Wrapper> source = Flux.create( sink -> createWrappers( 1000, sink ) );

        final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
                .map( wrapper -> enhanceWrapper( wrapper, 100 ) )
                .flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
                .sequential()
                .groupBy( details -> details.detailKey )
                .cache()
                .collectMap( group -> group.key(), group -> group.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).block() ).block();

        System.out.println( block );
    }

    private Wrapper enhanceWrapper( final Wrapper wrapper, final int count )
    {
        for ( int i = 0; i < count; i++ )
        {
            wrapper.detailsList.add( new Details( "detail-" + i, new BigDecimal( i +1 ) ) );
        }
        return wrapper;
    }

    private void createWrappers( final int count, final FluxSink<Wrapper> sink )
    {
        for ( int i = 0; i < count; i++ )
        {
            sink.next( new Wrapper( "Wrapper-" + i ) );
        }
        sink.complete();
    }

    private class Details
    {
        final String detailKey;

        final BigDecimal value;

        private Details( final String detailKey, final BigDecimal value )
        {
            this.detailKey = detailKey;
            this.value = value;
        }
    }

    private class Wrapper
    {
        final String lookupKey;

        final List<Details> detailsList = new ArrayList<>();

        private Wrapper( final String lookupKey )
        {
            this.lookupKey = lookupKey;
        }
    }
}

我该如何解决吊链的问题,或者我必须使用哪些替代方案来生成地图?

标签: project-reactorreactive-streams

解决方案


当使用 groupBy 与太多组并且下游速度不够快以消耗该组时,会发生这种情况。在您的示例中,您不应该在收集地图中阻止,但您应该在收集之前使用该组,例如:

final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
    .map( wrapper -> enhanceWrapper( wrapper, 100 ) )
    .flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
    .sequential()
    .groupBy( details -> details.detailKey )
    .cache()
    .flatMap(g -> g.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).map(v -> Tuples.of(g.key(), v)))
    .collectMap(Tuple2::getT1, Tuple2::getT2)
    .block();

所以现在下游已经足够快了,但是你可能需要根据组的数量来调整并发。并确保您的组数较少。


推荐阅读