首页 > 解决方案 > 使用 Java 的 Apache Flink 中的通用协议缓冲区反序列化器

问题描述

场景:Apache Flink、Kafka、Protocol buffers 数据消费者。

数据源是协议缓冲区格式的 Kafka 主题(多个主题:主题#1、主题#3、主题#3)。消费者是 Apache Flink 消费者。每个主题都有一个唯一的 protobuf 定义。

List<String> topicList = Arrays.asList("topic#1,topic#2,topic#3".split(","));
inputStream = env.addSource(new FlinkKafkaConsumer[ProtobufDeserializationSchema](topicList, new ProtobufDeserializationSchema(), properties));

我正在尝试在 Apache Flink 中开发通用数据摄取作业,以将 Kafka 中的数据摄取到数据库中。

如何为 Apache Flink 实现通用的 protobuf 反序列化器?我正在寻找将 Kafka 主题链接到 protobuf 定义以进行反序列化的实现。

最初的方法是将字节数组带入 Flink 数据流中,然后根据 Kafka 主题名称确定 protobuf 定义以反序列化 map 函数中的消息。我怎样才能以通用的方式做到这一点?

标签: javaapache-kafkaprotocol-buffersapache-flink

解决方案



推荐阅读