首页 > 解决方案 > Flink 数据流转换和暴露到 REST 端点

问题描述

我有 Spring Boot 应用程序并与 Apache Flink 集成。我想从 Kafka 系统中读取数据,并将它们暴露给 REST 端点。

以下是我的简单数据,

    @GetMapping("/details/{personName}")
    public String getPersonDetails() throws Exception {

        StreamExecutionEnvironment env =  LocalStreamEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "group_id");


        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
                new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);

        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String map(String value) throws Exception {
                logger.info(value);
                return value;
            }
        }).print();
        env.execute();

        return "hello world";

    }

我的问题是,


"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}

我如何通过应用过滤器转换为 JSON 来返回。例如,如果我来自 REST 调用的输入是“John”,我想对它们进行分组并对权重值求和并返回为 JSON(仅名称和权重)。

NFRS:

我在 Kafka 中有大量数据,因此想要扩展和快速处理。

标签: apache-kafkaapache-flinkflink-streaming

解决方案


听起来您可能需要花更多时间查看 Flink 文档。但简而言之...

  1. 添加一个MapFunction将字符串解析为 JSON,提取名称和权重,并将其输出为 Tuple2<String, Integer> 或一些自定义 Java 类。
  2. 执行 groupBy(name field),后跟 aProcessFunction对权重求和并将其保存在 state 中。
  3. 用于QueryableState将状态(总权重)暴露给作为程序 main() 方法的一部分运行的代码。
  4. 在您的 main 方法中,实现一个 REST 处理程序,该处理程序使用QueryableStateClient来获取给定名称的权重。

推荐阅读