project-reactor - 尝试总结分组值时减少挂起
问题描述
我正在尝试使用一个 Project Reactor 链来收集和分组值,最终按组总结它们。该集合分为两部分和阻塞。
在一个简化的示例中,我能够重现该问题。首先,我在 createWrappers() 中收集一些通用数据,它从网络中读取数据(阻塞调用)。检索数据时,会发出对象。在第二步中,从不同的阻塞网络位置收集详细信息,并将该信息添加到包装器部分。然后将数据转换为详细信息列表,按详细信息键分组,最后按详细信息键汇总。最后应该生成一个如下所示的地图(值特定于测试用例):
key value
------------------
detail-0 1000
detail-1 2000
detail-2 3000
...
一旦我将 block() 添加到 reduce() 部分,所有内容都挂在下面的示例代码中:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestBlockingIssue
{
@Test
public void testBlockingMap()
{
final Flux<Wrapper> source = Flux.create( sink -> createWrappers( 1000, sink ) );
final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
.map( wrapper -> enhanceWrapper( wrapper, 100 ) )
.flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
.sequential()
.groupBy( details -> details.detailKey )
.cache()
.collectMap( group -> group.key(), group -> group.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).block() ).block();
System.out.println( block );
}
private Wrapper enhanceWrapper( final Wrapper wrapper, final int count )
{
for ( int i = 0; i < count; i++ )
{
wrapper.detailsList.add( new Details( "detail-" + i, new BigDecimal( i +1 ) ) );
}
return wrapper;
}
private void createWrappers( final int count, final FluxSink<Wrapper> sink )
{
for ( int i = 0; i < count; i++ )
{
sink.next( new Wrapper( "Wrapper-" + i ) );
}
sink.complete();
}
private class Details
{
final String detailKey;
final BigDecimal value;
private Details( final String detailKey, final BigDecimal value )
{
this.detailKey = detailKey;
this.value = value;
}
}
private class Wrapper
{
final String lookupKey;
final List<Details> detailsList = new ArrayList<>();
private Wrapper( final String lookupKey )
{
this.lookupKey = lookupKey;
}
}
}
我该如何解决吊链的问题,或者我必须使用哪些替代方案来生成地图?
解决方案
当使用 groupBy 与太多组并且下游速度不够快以消耗该组时,会发生这种情况。在您的示例中,您不应该在收集地图中阻止,但您应该在收集之前使用该组,例如:
final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
.map( wrapper -> enhanceWrapper( wrapper, 100 ) )
.flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
.sequential()
.groupBy( details -> details.detailKey )
.cache()
.flatMap(g -> g.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).map(v -> Tuples.of(g.key(), v)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.block();
所以现在下游已经足够快了,但是你可能需要根据组的数量来调整并发。并确保您的组数较少。
推荐阅读
- iis - IIS 10 管理器 - 证书窗格中缺少选项
- powershell - 无法在通过 Direct Connect VPN 连接的计算机上运行 Get-WmiObject
- python - 从字典中获取 DataFrame 中的奇怪标签
- opc-ua - 关于 OPC UA 中的全局发现服务器
- java - 运行 jar 文件使用 Qt (QProcess)
- docker - 我已经使用 Docker Desktop 创建了卷,我看到使用“docker volume ls”的卷,但 Windows 中的文件夹是空的
- react-native - 您可以将 jest 测试文件与 react-native 中的生产文件放在一起而不捆绑它们吗?
- flutter - Flutter searchDelegate - 在另一个页面中导航并显示结果
- .net-core - 如何手动创建 ODataQueryOptions
- python - TFRecords InvalidArgumentError:键:x_img_shape。无法解析序列化示例