apache-flink - 使用 Flink 将 POJO 保存到 Cassandra
问题描述
我是 Flink 的新手,我想将 Kafka 流数据存储到 Cassandra 中。我已将 String 转换为 POJO。我的 POJO 如下,
@Table(keyspace = "sample", name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
我的转换发生如下,
stream.flatMap(new FlatMapFunction<String, Person>() {
public void flatMap(String value, Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print(); // I need to use proper method to convert to Datastream.
env.execute();
我阅读了以下链接上的文档以供参考,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
Cassandra Sink 接受 DataStream 实例。我需要转换我的转换并将它们存储到 Kafka 中。
Cant create Cassandra Pojo Sink也给了我一些想法。
有.forward()
返回的方法 DataStream<Reading> forward
,当将实例传递给时,
CassandraSink.addSink(forward)
.setHost("localhost")
.build();
cannot access org.apache.flink.streaming.api.scala.DataStream
如何将我的 POJO 转换为存储在 Cassandra 中?
解决方案
推荐阅读
- python - 使用 Python 将内联 HTML 发送到 Gmail
- python - 熊猫移动行块
- python - 在 Pipeline sklearn 中共享参数
- python - 将数据拆分为特征和标签后,标签列形状不一致
- ios - 在uitableview中编辑特定单元格
- wordpress - 在 Timber 中添加 WP 自定义徽标作为 Webp 图像
- kendo-grid - Kendo Gird .Update 调用java脚本函数
- matlab - 将 Matlab 函数 (mscript) 转换为 simulink 模型
- python - 将列表字符串列表转换为python中的日期以进行时间序列建模 -
- windows - Windows FTP 命令行客户端不会使用 mget * 下载所有文件