首页 > 解决方案 > 如何在 Flink 程序中逐行读取 Kafka Topic

问题描述

首先,我在 Kafka 主题中加载了一个 CSV 文件,我可以通过 Flink 程序打印该主题。代码如下:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers", 
     "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
    prop.setProperty("group.id", "test");
    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 
     ("flinkTopic", new SimpleStringSchema(),prop);
    myConsumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(myConsumer);
    stream.print();
    env.execute("Flink Streaming Java API Skeleton");

我的问题是我想逐行阅读主题并分别处理每一行,请指导我如何逐行阅读 Kafka 主题?

任何帮助将非常感激。

标签: apache-kafkaapache-flinkkafka-topic

解决方案


有关您可能会做什么的示例,我建议您按照自己的方式完成在线Apache Flink 培训。您可以使用 filter、map、flatmap、Windows 和 ProcessFunctions 等操作逐行处理流。

您可能想知道如何方便地使用 CSV 数据。最简单的方法是使用 Table/SQL API,它有自己的Kafka 连接器CSV 格式

在不使用 Flink 的 SQL 引擎的情况下,你可以实现一个 map 函数,将每一行文本转换为一个 POJO。这里有一个例子。或者实现您自己的反序列化器,而不是 SimpleStringSchema。


推荐阅读