java - 如何在平行通量内的同一线程中运行单声道
问题描述
我正在尝试用来自 Mono 的值填充 Flux 中的对象。当我试图这样做时,它只是忽略了我的“设置”操作。我认为这是因为 Flux 是并行工作的,而 Mono 不是。我怎么解决这个问题?
Flux.fromIterable(proxyParserService.getProxyList())
.parallel()
.runOn(Schedulers.parallel())
.filter(proxy -> proxy.getCorrupted() == null || !proxy.getCorrupted())
.subscribe(proxy -> {
try {
RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
geoDataService.getData(proxy.getHost()) // Here comes the Mono object, that contains needed value to set into "proxy"
.subscribe(geoData ->
{
log.info("GEODATA: {} ", geoData);
proxy.setCountryCode(geoData.getCountryCode()); // ignored somehow
});
proxy.setCorrupted(false);
addresses.add(proxy);
log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
log.info("Final result: {}", proxy.toString());
} catch (ResourceAccessException e) {
log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
proxy.setCorrupted(true);
addresses.add(proxy);
}
},
throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));
}
如您所见,我正在尝试将国家/地区代码设置为代理。
解决方案
解决了。在“flatMap”运算符中添加了 Mono 对象。例子:
Flux.fromIterable(proxyParserService.getProxyList())
.parallel()
.runOn(Schedulers.parallel())
.filter(poxy -> !valueExist(addresses.values(), poxy))
.flatMap(geoDataService::getData) // Now it runs in parallel threads
.subscribe(proxy -> {
try {
RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
proxy.setCorrupted(false);
addresses.put(proxy.getCountryCode(), proxy);
log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
log.info("Final result: {}", proxy.toString());
} catch (ResourceAccessException e) {
log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
proxy.setCorrupted(true);
addresses.put(proxy.getCountryCode(), proxy);
}
},
throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));
推荐阅读
- python - sqlalchemy.exc.MultipleResultsFound:当恰好需要一个时找到了多行
- css - 将 CSS 导出到单个文件和每个 SCSS 一个文件
- node.js - 将 GRPC 工具输出保存到指定位置
- python - Python - 获取“打开方式”文件
- azure-cognitive-services - mstts:backgroundaudio SSML 标签在 SDK 上不起作用
- python - 连接到 postgreSQL 数据库时安装 psycopg2 时出错
- google-apps-script - 附加组件上的错误安装时钟触发器
- java - javaee Schema(Eclipse)中web.xml文件中的奇怪错误
- javascript - 将哈希添加到 vanilla js 中的 href 集合
- python-3.x - 如何覆盖通过 otm 关系链接到主模型的模型中的数据?