hazelcast-jet - 使用 Hazelcast Jet 可以下沉到 java 列表?
问题描述
我有一个帐户列表并在刻度上执行哈希连接并返回带有刻度数据的帐户。但是在 hashjoin 之后我有drainTo lListJet
然后阅读它DistributedStream
并返回它。
public List<Account> populateTicksInAccounts(List<Account> accounts) {
...
...
Pipeline p = Pipeline.create();
BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));
p.drawFrom(ticksSource)
.hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
.drainTo(Sinks.list(TEMP_LIST));
jet.newJob(p).join();
IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}
是否可以drainTo
使用 javaList
而不是lListJet
在执行 hashjoin 之后?
像下面这样的东西是可能的吗?
IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
.hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
.drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;
CustomSinkProcessor 将在哪里获取空的 java 列表并与帐户一起返回?
解决方案
请记住,您提交给 Jet 执行的代码在您提交它的进程之外运行。虽然理论上可以提供您要求的 API,但在后台它只需要执行一些技巧来在集群的每个成员上运行代码,让所有成员将他们的结果发送到一个地方,然后填写列一个清单返回给你。这将违背分布式计算的本质。
如果您认为它有助于代码的可读性,您可以编写一个辅助方法,如下所示:
public <T, R> List<R> drainToList(GeneralStage<T> stage) {
String tmpListName = randomListName();
SinkStage sinkStage = stage.drainTo(Sinks.list(tmpListName));
IListJet<R> tmpList = jet.getList(tmpListName);
try {
jet.newJob(sinkStage.getPipeline()).join();
return new ArrayList<>(tmpList);
} finally {
tmpList.destroy();
}
}
特别注意这一行
return new ArrayList<>(tmpList);
而不是你的
IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
这只是将一个 Hazelcast 列表复制到另一个列表并返回一个句柄。现在您已经泄露了 Jet 集群中的两个列表。当您停止使用它们时,它们不会自动消失。
即使我提供的代码仍然可能泄漏。运行它的 JVM 进程可能会在Job.join()
没有到达finally
. 然后临时列表继续存在。
推荐阅读
- javascript - 这个JS代码有什么问题![类型错误]它说错误是我使用“添加”和“删除”的地方
- python - 在QThread中:函数完成后1秒再次调用函数
- rust - 为什么程序宏在我的构建脚本中看不到由 dotenv 设置的环境变量?
- amazon-web-services - docker exec cli 对等通道创建 | 未能创建新连接:超出上下文期限 | 亚马逊管理的区块链
- elasticsearch - ElasticSearch 应该嵌套和 bool must_not 存在
- sql - 更新表 1 中与表 2 不匹配的多个列。如果匹配,我不想更新
- c# - 在 DDD 中,子实体是否可以引用聚合根
- c# - C# Datetime 用小时和分钟添加天数
- datetime - Pine Script:如何在每次价格更新时在标签上的图表时区显示当前时间?
- reactjs - orderBy 与 react-firebase-hooks/firestore 不工作