apache-kafka - Flink 数据流转换和暴露到 REST 端点
问题描述
我有 Spring Boot 应用程序并与 Apache Flink 集成。我想从 Kafka 系统中读取数据,并将它们暴露给 REST 端点。
以下是我的简单数据,
@GetMapping("/details/{personName}")
public String getPersonDetails() throws Exception {
StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "group_id");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
logger.info(value);
return value;
}
}).print();
env.execute();
return "hello world";
}
我的问题是,
- 我的 Kafka 返回字符串值如下,
"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
我如何通过应用过滤器转换为 JSON 来返回。例如,如果我来自 REST 调用的输入是“John”,我想对它们进行分组并对权重值求和并返回为 JSON(仅名称和权重)。
第二个问题,我无法停止执行环境。有其他选择吗?我检查了 Flink 文档,我没有得到任何适合我的情况。
第三个问题,我想保持在环境中是急切加载,尝试使用静态块但也需要更多时间。
NFRS:
我在 Kafka 中有大量数据,因此想要扩展和快速处理。
解决方案
听起来您可能需要花更多时间查看 Flink 文档。但简而言之...
- 添加一个
MapFunction
将字符串解析为 JSON,提取名称和权重,并将其输出为 Tuple2<String, Integer> 或一些自定义 Java 类。 - 执行 groupBy(name field),后跟 a
ProcessFunction
对权重求和并将其保存在 state 中。 - 用于
QueryableState
将状态(总权重)暴露给作为程序 main() 方法的一部分运行的代码。 - 在您的 main 方法中,实现一个 REST 处理程序,该处理程序使用
QueryableStateClient
来获取给定名称的权重。
推荐阅读
- shell - 从命令行启动 Minecraft
- reactjs - 用于连接功能的 React Redux mapStateToProps 参数
- python - 如何计算熊猫数据框中每分钟出现的次数
- sql - 查找按订单聚合的 MAX 日期 - Oracle SQL
- c# - Blazor/Razor 组件在绑定值更改时执行某些操作
- c++ - Setw 在输出文件中似乎无法始终如一地工作
- javascript - Codewars 上的 Javascript Kata 出现意外结果
- reactjs - 类型“IPayload”上不存在属性“then”
' - linux - 如何使用for循环将文本文件中的一行字符串作为Bash中另一个脚本的单独变量传递
- excel - VBA代码自动填充可变数量的列