首页 > 解决方案 > 多节点 hazelcast 设置中的文件消耗

问题描述

我看到了可以使用 jet 使用 CSV 文件的示例,例如。

BatchSource<SalesRecordLine> source = Sources.filesBuilder(sourceDir)
             .glob("*.csv")
             .build(path -> Files.lines(path).skip(1).map(SalesRecordLine::parse));

在多节点设置中,所有节点会开始获取文件(比如说共享 NFS)还是使用一些智能锁定(如 Apache Camel 的幂等文件消费者方法?)。Jet 如何在读取之前知道文件已完全刷新到磁盘?

谢谢

标签: hazelcasthazelcast-jet

解决方案


您可以仅将文件放在一个节点上,并让 Jet 将数据分发给所有成员。Jet 目前缺乏对流再平衡的一流支持,但您可以通过以下方式实现它,有点迂回:

pipeline.readFrom(source)
        .groupingKey(x -> x)
        .mapStateful(() -> null, (state, key, item) -> item)
        .restOfYourPipeline();

groupingKey(x -> x)指定分区函数。我使用了一个普通的身份函数,但你可以放置任何对你的数据有意义的东西。


推荐阅读