apache-flink - 在 Flink 集群中注册 Java 类
问题描述
我在 Flink Cluster 中运行我的 Fat Jar,它读取 Kafka 并保存在 Cassandra 中,代码是,
final Properties prop = getProperties();
final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
(kafkaTopicName, new SimpleStringSchema(), prop);
flinkConsumer.setStartFromEarliest();
final DataStream<String> stream = env.addSource(flinkConsumer);
DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
logger.error("Json Processing Exception", e);
}
}
});
savePersonDetails(sensorStreaming);
env.execute();
和 POJO 包含的人,
@Column(name = "event_time")
private Instant eventTime;
Instant
Cassandra端需要如下存储的编解码器,
final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);
cluster.getConfiguration().getCodecRegistry().register(InstantCodec.instance);
当我运行独立时工作正常,但是当我运行本地集群时会抛出如下错误,
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:526)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
at com.datastax.driver.core.CodecRegistry.access$200(CodecRegistry.java:140)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)
我阅读了以下注册文件,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html
但是InstantCodec
第 3 方之一。我该如何注册?
解决方案
我解决了这个问题,LocalDateTime
当我用相同的类型转换时,出现了上面的错误。我将类型更改为 java.utilDate
类型然后它起作用了。
推荐阅读
- json - Go Gin 将 json 响应转换为 base64
- swift - 如何执行批量更新以删除collectionview swift 5中的单元格?
- c# - 如何使用派生 TreeNode 实例迭代添加到 TreeView 的节点
- ios - Ionic Cordova 构建以 xcodebuild 结束:命令失败,退出代码 65
- r - 解释零膨胀回归摘要
- amazon-web-services - 使用 s3“同步”命令时,AWS 存储桶复制在私有网络中运行
- c - 如何在刷新之前查看标准输出缓冲区?
- python - 检查元组之间的不等式
- linux - 如何从 ps -ef 的输出中获取进程 ID | grep 'tomcat'?
- algorithm - 动态表数据结构摊销分析