apache-kafka - Apache Flink Kafka 消费者问题
问题描述
我在 Kafka 中有数据,我想读取 Kafka 是否发送数据的数据,然后过滤它们并返回 JSON。
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
//config.setWriteTimestampToKafka(true);
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
案例 1:当 Kafka 生产者将数据发送到 Kafka 时,我可以在控制台中看到值打印。- 这很好。案例 2:Kafka 生产者停止发送数据,Kafka 在主题中仍然有价值,但相同的代码没有返回任何数据。 - 这可能吗?
知道在哪里犯错了吗?
{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}
我想要过滤firstname
并返回 JSON。
我看到在数据流处理过程中有过滤器选项。
解决方案
如果你想从第一条消息开始,你应该设置consumer.setStartFromEarliest();
. 它将从第一个未确认的消息开始读取。
推荐阅读
- html - 引导网格无法正确显示
- java - org.json.JSONException:字符 1 处缺少值
- javascript - 使用带有 2checkout 支付模块的 oscommerce 面临的问题
- java - Oracle Form 经常挂起
- encryption - 需要提示:共享内容的加密。如何组织?
- angular - Angular 4过滤器搜索选定列的自定义管道
- sql-server - 在sql server中合并两个表
- html5-video - Google Photos 视频流在 Safari 中不起作用
- types - 如何修复损坏的 EA 存储库中的引用类型
- c# - 使用 Lambda 在列表中列出