首页 > 解决方案 > 在 Flink 集群中注册 Java 类

问题描述

我在 Flink Cluster 中运行我的 Fat Jar,它读取 Kafka 并保存在 Cassandra 中,代码是,

        final Properties prop = getProperties();
        final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
                                             (kafkaTopicName, new SimpleStringSchema(), prop);
        flinkConsumer.setStartFromEarliest();

        final DataStream<String> stream = env.addSource(flinkConsumer);
        DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    logger.error("Json Processing Exception", e);
                }
            }
        });
        savePersonDetails(sensorStreaming);
        env.execute();

和 POJO 包含的人,

    @Column(name = "event_time")
    private Instant eventTime;

InstantCassandra端需要如下存储的编解码器,

final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);
cluster.getConfiguration().getCodecRegistry().register(InstantCodec.instance);

当我运行独立时工作正常,但是当我运行本地集群时会抛出如下错误,

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
    at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
    at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:526)
    at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
    at com.datastax.driver.core.CodecRegistry.access$200(CodecRegistry.java:140)
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)

我阅读了以下注册文件,

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html

但是InstantCodec第 3 方之一。我该如何注册?

标签: apache-flinkflink-streamingdatastax-java-driver

解决方案


我解决了这个问题,LocalDateTime当我用相同的类型转换时,出现了上面的错误。我将类型更改为 java.utilDate类型然后它起作用了。


推荐阅读