首页 > 解决方案 > 使用基于地图条目的 ttl 在 hazelcast 地图中批量加载

问题描述

我需要从 hazelcast 地图中的平面文件中加载大约 1000 万条记录。此外,ttl 需要根据每个地图条目进行设置。什么是最有效的方法?

目前我正在使用 Imap.putall()。有没有办法使用 putall 根据地图条目设置 ttl?

标签: hazelcasthazelcast-imap

解决方案


没有一个 API 允许您在单个到期时进行批量放置。

下面的代码将是一种将 Hazelcast Jet 写入 Hazelcast 的 IMap 的方法。

客户端提交此作业和网格服务器进程,读取输入服务器端的单个文件。该行.groupingKey通过条目键对输入流进行分区,因此每个服务器map.put都会执行一个本地键,但每个条目都使用不同的 TTL 进行丰富。

这是遍历输入文件并单独插入每个键的替代方法。是否更快将取决于网络速度,服务器数量等因素。它肯定比简单的迭代更复杂,因此速度增益需要证明复杂性是合理的。

public class MyClient implements EntryExpiredListener<Long, Long> {

    private static final String INPUT_DIRECTORY = System.getProperty("user.home") + "/input_data";
    private static final String MAP_NAME = "test";

    public static void main(String[] args) {
        new MyClient().go();
    }
    
    public void go() {
        JetInstance jetInstance = Jet.newJetClient();
        
        jetInstance.getMap(MAP_NAME).addEntryListener(this, false);
        
        Pipeline pipeline = MyClient.buildPipeline();
        
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(MyClient.class);
        
        try {
            jetInstance.newJob(pipeline, jobConfig).join();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Process a file that looks like <pre>
     * % cat test/input
     * 1
     * 2
     * 3
     * 4
     * 5
     * </pre>
     * @return
     */
    private static Pipeline buildPipeline() {
        ComparatorEx<Tuple3<Long, Long, Long>> comparatorEx = ComparatorEx.comparingLong(Tuple3::f0);

        Pipeline pipeline = Pipeline.create();

        BatchStage<String> input = pipeline.readFrom(MyClient.mySource(INPUT_DIRECTORY));
        
        // Convert to trios of key, value, expiry
        BatchStage<Tuple3<Long, Long, Long>> tuples
            = input
                .map(line -> {
                    long l = Long.parseLong(line);
                    return Tuple3.<Long, Long, Long>tuple3(100 * l, 200 * l, 300 * l);
                });
        
        // Route per JVM based on entry key
        BatchStage<Entry<Long, Tuple3<Long, Long, Long>>> routedEntries
            = tuples
                .groupingKey(Tuple3::f0)
                .rollingAggregate(AggregateOperations.maxBy(comparatorEx));
        
        // Custom map save using expiry
        routedEntries.writeTo(MyClient.mySink(MAP_NAME));
        
        // [Optional] all log entries to systout
        routedEntries.writeTo(Sinks.logger());
                
        return pipeline;
    }

    private static BatchSource<String> mySource(String directory) {
        return Sources.filesBuilder(directory)
                .sharedFileSystem(true)
                .build();
    }

    private static Sink<? super Entry<Long, Tuple3<Long, Long, Long>>> mySink(String mapName) {
            return SinkBuilder.sinkBuilder("mySink", 
                    processorContext -> processorContext.jetInstance().<Long, Long>getMap(mapName))
                .receiveFn((IMap<Long, Long> map, Entry<Long, Tuple3<Long, Long, Long>> entry) -> {
                    map.put(entry.getKey(), entry.getValue().f1(), entry.getValue().f2(), TimeUnit.SECONDS);
                })
                .build();
    }

    @Override
    public void entryExpired(EntryEvent<Long, Long> entryEvent) {
        System.out.println(entryEvent.getEventType() + " for " + entryEvent.getKey());
    }

}

推荐阅读