apache-kafka - 如何在 Flink 程序中逐行读取 Kafka Topic
问题描述
首先,我在 Kafka 主题中加载了一个 CSV 文件,我可以通过 Flink 程序打印该主题。代码如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers",
"10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
prop.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<>
("flinkTopic", new SimpleStringSchema(),prop);
myConsumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(myConsumer);
stream.print();
env.execute("Flink Streaming Java API Skeleton");
我的问题是我想逐行阅读主题并分别处理每一行,请指导我如何逐行阅读 Kafka 主题?
任何帮助将非常感激。
解决方案
有关您可能会做什么的示例,我建议您按照自己的方式完成在线Apache Flink 培训。您可以使用 filter、map、flatmap、Windows 和 ProcessFunctions 等操作逐行处理流。
您可能想知道如何方便地使用 CSV 数据。最简单的方法是使用 Table/SQL API,它有自己的Kafka 连接器和CSV 格式。
在不使用 Flink 的 SQL 引擎的情况下,你可以实现一个 map 函数,将每一行文本转换为一个 POJO。这里有一个例子。或者实现您自己的反序列化器,而不是 SimpleStringSchema。
推荐阅读
- openlayers - Openlayers中的IDW(反距离加权)插值(自定义调色板)
- flutter - Xamarin VS 颤振
- c++ - OpenCV从相机中每行读取的像素太少
- pine-script - Pine 中的简单时间策略
- python - 如何管理功能以高效使用美汤
- swiftui - 如何在 iPad 上的 SwiftUI 拆分视图中更改主视图宽度
- c++ - 此树实现未显示任何输出
- c# - C# 测试文件与 .NetCore 控制台应用程序中用户的 linux 主路径一起存在
- python - Python Tkinter 项目中的 IndexError
- go - 如何使用自定义错误消息进行 OpenAPI 组件验证?