hazelcast - 使用基于地图条目的 ttl 在 hazelcast 地图中批量加载
问题描述
我需要从 hazelcast 地图中的平面文件中加载大约 1000 万条记录。此外,ttl 需要根据每个地图条目进行设置。什么是最有效的方法?
目前我正在使用 Imap.putall()。有没有办法使用 putall 根据地图条目设置 ttl?
解决方案
没有一个 API 允许您在单个到期时进行批量放置。
下面的代码将是一种将 Hazelcast Jet 写入 Hazelcast 的 IMap 的方法。
客户端提交此作业和网格服务器进程,读取输入服务器端的单个文件。该行.groupingKey
通过条目键对输入流进行分区,因此每个服务器map.put
都会执行一个本地键,但每个条目都使用不同的 TTL 进行丰富。
这是遍历输入文件并单独插入每个键的替代方法。是否更快将取决于网络速度,服务器数量等因素。它肯定比简单的迭代更复杂,因此速度增益需要证明复杂性是合理的。
public class MyClient implements EntryExpiredListener<Long, Long> {
private static final String INPUT_DIRECTORY = System.getProperty("user.home") + "/input_data";
private static final String MAP_NAME = "test";
public static void main(String[] args) {
new MyClient().go();
}
public void go() {
JetInstance jetInstance = Jet.newJetClient();
jetInstance.getMap(MAP_NAME).addEntryListener(this, false);
Pipeline pipeline = MyClient.buildPipeline();
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(MyClient.class);
try {
jetInstance.newJob(pipeline, jobConfig).join();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Process a file that looks like <pre>
* % cat test/input
* 1
* 2
* 3
* 4
* 5
* </pre>
* @return
*/
private static Pipeline buildPipeline() {
ComparatorEx<Tuple3<Long, Long, Long>> comparatorEx = ComparatorEx.comparingLong(Tuple3::f0);
Pipeline pipeline = Pipeline.create();
BatchStage<String> input = pipeline.readFrom(MyClient.mySource(INPUT_DIRECTORY));
// Convert to trios of key, value, expiry
BatchStage<Tuple3<Long, Long, Long>> tuples
= input
.map(line -> {
long l = Long.parseLong(line);
return Tuple3.<Long, Long, Long>tuple3(100 * l, 200 * l, 300 * l);
});
// Route per JVM based on entry key
BatchStage<Entry<Long, Tuple3<Long, Long, Long>>> routedEntries
= tuples
.groupingKey(Tuple3::f0)
.rollingAggregate(AggregateOperations.maxBy(comparatorEx));
// Custom map save using expiry
routedEntries.writeTo(MyClient.mySink(MAP_NAME));
// [Optional] all log entries to systout
routedEntries.writeTo(Sinks.logger());
return pipeline;
}
private static BatchSource<String> mySource(String directory) {
return Sources.filesBuilder(directory)
.sharedFileSystem(true)
.build();
}
private static Sink<? super Entry<Long, Tuple3<Long, Long, Long>>> mySink(String mapName) {
return SinkBuilder.sinkBuilder("mySink",
processorContext -> processorContext.jetInstance().<Long, Long>getMap(mapName))
.receiveFn((IMap<Long, Long> map, Entry<Long, Tuple3<Long, Long, Long>> entry) -> {
map.put(entry.getKey(), entry.getValue().f1(), entry.getValue().f2(), TimeUnit.SECONDS);
})
.build();
}
@Override
public void entryExpired(EntryEvent<Long, Long> entryEvent) {
System.out.println(entryEvent.getEventType() + " for " + entryEvent.getKey());
}
}
推荐阅读
- python-3.x - 我使用python制作了一个项目,可以检测图像中的对象,但我需要为输入/输出做GUI
- javascript - 使用 socket.io 和 javascript 在聊天室中显示在线用户名
- sql - 查找相对百分比
- android - imageView 属性 android:adjustViewBounds=true 是否具有与 layout_width=true 和 layout_height=true 完全相同的影响?
- php - PHP 7.4 和 MySQL 连接
- html - getUserMedia API 在 React 中不适用于 iOS Safari
- vue.js - Vue Router 在服务中的使用
- reactjs - 对 .jsx 文件使用绝对路径?
- c - getopt_long 使用标志结构成员
- c++ - 如何在此代码上撤消 cin 的重定向?