首页 > 解决方案 > 使用 Flink 将 POJO 保存到 Cassandra

问题描述

我是 Flink 的新手,我想将 Kafka 流数据存储到 Cassandra 中。我已将 String 转换为 POJO。我的 POJO 如下,

@Table(keyspace = "sample", name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;

我的转换发生如下,

 stream.flatMap(new FlatMapFunction<String, Person>() {
            public void flatMap(String value, Collector<Person> out) {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).print(); // I need to use proper method to convert to Datastream.
        env.execute();

我阅读了以下链接上的文档以供参考,

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html

Cassandra Sink 接受 DataStream 实例。我需要转换我的转换并将它们存储到 Kafka 中。

Cant create Cassandra Pojo Sink也给了我一些想法。

.forward()返回的方法 DataStream<Reading> forward,当将实例传递给时,

 CassandraSink.addSink(forward)
                .setHost("localhost")
                .build();
cannot access org.apache.flink.streaming.api.scala.DataStream

如何将我的 POJO 转换为存储在 Cassandra 中?

标签: apache-flinkflink-streamingcassandra-3.0

解决方案


推荐阅读