reactive-programming - 组合两个不同类型的 Flux 实例
问题描述
使用 SpringBoot 2 和 Poi 类(兴趣点):
public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}
我有 2 个 Poi 通量:
Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;
第一个元素availablePoisFlux包含 Pois,其中:
- 一个poidId
- 没有纬度信息
- 没有经度信息
- 价格信息
第二个元素poiFlux包含 Pois,其中:
- 一个poidId
- 一个纬度
- 经度
- 没有价格信息
(poidId 是 Poi 的标识符)。
我想用两个 Flux(poiFlux 和 availablePoisFlux)的 Pois(带有 poidId、价格、经度和纬度)创建一个新的 Flux resultPoisFlux 。
poidId 属性是两个 Flux(poiFlux 和 availablePoisFlux)之间的关键。
示例实现:
我想我可以使用 zipWith 运算符来做到这一点,但我需要一些关于反应式运算符(和过滤器?)的信息和建议
我想从第一个 Flux 迭代并使用 poidId 标识符从第二个 Flux 获取信息(价格),并使用正确的值更新价格属性。
示例输入值:
poiFlux = Poi(poidId=poiId0, price=null, name=name0, latitude=2.2222, longitude=14.222)
poiFlux = Poi(poidId=poiId1, price=null, name=name1, latitude=3.2222, longitude=15.222)
poiFlux = Poi(poidId=poiId2, price=null, name=name2, latitude=4.2222, longitude=16.222)
poiFlux = Poi(poidId=poiId3, price=null, name=name3, latitude=5.2222, longitude=17.222)
poiFlux = Poi(poidId=poiId4, price=null, name=name4, latitude=6.2222, longitude=18.222)
poiFlux = Poi(poidId=poiId5, price=null, name=name5, latitude=7.2222, longitude=19.222)
poiFlux = Poi(poidId=poiId6, price=null, name=name6, latitude=8.2222, longitude=20.222)
poiFlux = Poi(poidId=poiId7, price=null, name=name7, latitude=9.2222, longitude=21.222)
poiFlux = Poi(poidId=poiId8, price=null, name=name8, latitude=10.2222, longitude=22.222)
poiFlux = Poi(poidId=poiId9, price=null, name=name9, latitude=11.2222, longitude=23.222)
availablePoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=null, longitude=null)
预期结果:
resultPoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=2.2222, longitude=14.222)
resultPoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=3.2222, longitude=15.222)
resultPoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=4.2222, longitude=16.222)
resultPoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=5.2222, longitude=17.222)
resultPoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=6.2222, longitude=18.222)
像这样的东西:
Flux<Poi> resultPoisFlux = availablePoisFlux.zipWith(poiFlux, (a, b) -> new Poi(a.getPoidId(), a.getPrice(), getLatitudeFromPoiFluxByPoidId(a.getPoidId()), getLongitudeFromPoiFluxByPoidId(a.getPoidId())))....
谢谢你的帮助。
解决方案
zip/zipWith
,但它只成对组合了两个来源......
...只要它有足够的元素来配对。因此,如果您保证元素在两个源中的顺序相同,并且poiIds
每一侧都没有差异,那么它仅对您的情况有用。在您的示例中就是这种情况,因为即使第二个来源只有 4 个元素,这些元素与第一个来源的开头相同。
poiFlux.zipWith(availablePoisFlux, (a, b) -> new Poi(a.getPoiId(),
b.getPrice(),
a.getLatitude(),
a.getLongitude(),
a.getName()));
更通用的解决方案,更少的反应
如果没有这样的保证,您需要以某种方式组合两个无序且不相交的序列。如果不收集其中一个源(最好是availablePoisFlux
)中的所有元素,您就无法做到这一点,这意味着它将延迟另一个源的处理,直到所述源完成。
一种组合方式是将所有值收集到一个以第二个源为键的映射中poiId
,然后“迭代”第二个源。由于地图中可能找不到某些元素,因此您需要handle
能够“跳过”这些元素:
availablePoisFlux.collectMap(Poi::getId, Poi::getPrice)
.flatMapMany(knownPrices -> poiFlux.handle((poi, sink) -> {
String poiId = poi.getPoiId();
if (knownPrices.containsKey(poiId) {
Double price = knownPrices.get(poiId);
Poi complete = new Poi(poiId, price, poi.getLatitude(),
poi.getLongitude(), poi.getName());
sink.next(complete);
} //else do nothing and let handle skip that poi
}));
推荐阅读
- css - 按百分比填充圆形边框
- c - 卡在向字符串添加字符
- android - tabBarIcon 未显示在 createBottomTabNavigator 上
- c# - 将 C# 与 Prolog 集成 - 尽管目标平台设置为 x86,但获取“System.BadImageFormatException”异常
- ios - (ARKit 2 Room Scanner)如何使用 UIBezierPath 和 featurePoints 来创建一个带有 ARKit 2 和 ARWorldMap 周围几何图形的节点
- c++ - 构造函数不接受 char *
- javascript - 使用来自不同组件的值设置状态
- go - 3D 打印机串行通信
- c# - 在 .Net Core 2.1 的 XUnit 单元测试中注册 AutoMapper
- javascript - 将数组转换为 N 大小但具有最大子数组大小的数组的数组