首页 > 解决方案 > 使用 Hazelcast Jet 可以下沉到 java 列表?

问题描述

我有一个帐户列表并在刻度上执行哈希连接并返回带有刻度数据的帐户。但是在 hashjoin 之后我有drainTo lListJet然后阅读它DistributedStream并返回它。

public List<Account> populateTicksInAccounts(List<Account> accounts) {
    ...
    ...
    Pipeline p = Pipeline.create();
    BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
    BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));

    p.drawFrom(ticksSource)
        .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
        .drainTo(Sinks.list(TEMP_LIST));

    jet.newJob(p).join();
    IListJet<Account> list = jet.getList(TEMP_LIST);
    return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}

是否可以drainTo使用 javaList而不是lListJet在执行 hashjoin 之后?

像下面这样的东西是可能的吗?

IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
    .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
    .drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;

CustomSinkProcessor 将在哪里获取空的 java 列表并与帐户一起返回?

标签: hazelcast-jet

解决方案


请记住,您提交给 Jet 执行的代码在您提交它的进程之外运行。虽然理论上可以提供您要求的 API,但在后台它只需要执行一些技巧来在集群的每个成员上运行代码,让所有成员将他们的结果发送到一个地方,然后填写列一个清单返回给你。这将违背分布式计算的本质。

如果您认为它有助于代码的可读性,您可以编写一个辅助方法,如下所示:

public <T, R> List<R> drainToList(GeneralStage<T> stage) {
    String tmpListName = randomListName();
    SinkStage sinkStage = stage.drainTo(Sinks.list(tmpListName));
    IListJet<R> tmpList = jet.getList(tmpListName);
    try {
        jet.newJob(sinkStage.getPipeline()).join();
        return new ArrayList<>(tmpList);
    } finally {
        tmpList.destroy();
    }
}

特别注意这一行

return new ArrayList<>(tmpList);

而不是你的

IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());

这只是将一个 Hazelcast 列表复制到另一个列表并返回一个句柄。现在您已经泄露了 Jet 集群中的两个列表。当您停止使用它们时,它们不会自动消失。

即使我提供的代码仍然可能泄漏。运行它的 JVM 进程可能会在Job.join()没有到达finally. 然后临时列表继续存在。


推荐阅读