java - 使用 Java 的 Apache Flink 中的通用协议缓冲区反序列化器
问题描述
场景:Apache Flink、Kafka、Protocol buffers 数据消费者。
数据源是协议缓冲区格式的 Kafka 主题(多个主题:主题#1、主题#3、主题#3)。消费者是 Apache Flink 消费者。每个主题都有一个唯一的 protobuf 定义。
List<String> topicList = Arrays.asList("topic#1,topic#2,topic#3".split(","));
inputStream = env.addSource(new FlinkKafkaConsumer[ProtobufDeserializationSchema](topicList, new ProtobufDeserializationSchema(), properties));
我正在尝试在 Apache Flink 中开发通用数据摄取作业,以将 Kafka 中的数据摄取到数据库中。
如何为 Apache Flink 实现通用的 protobuf 反序列化器?我正在寻找将 Kafka 主题链接到 protobuf 定义以进行反序列化的实现。
最初的方法是将字节数组带入 Flink 数据流中,然后根据 Kafka 主题名称确定 protobuf 定义以反序列化 map 函数中的消息。我怎样才能以通用的方式做到这一点?
解决方案
flink-statefun 包含一个可能有用的通用 protobuf (de)serializer。
推荐阅读
- javascript - 按过滤器排序数据,反应js
- c - 如何将结构的哪个成员的指示符传递给C中的函数
- python - 时间序列数据框到面板
- reactjs - 为什么 TypeScript 看不到嵌套值不能未定义?
- python - 有没有办法让 VSCode 自动使用最近的 Pipfile/pipenv?
- c++ - 有人可以纠正代码并解释为什么 gets() 没有正确读取值吗?
- r - 使用向量化函数计算向量中连续零的最大数量
- python - (Python) 创建一个学生类。每个学生都应该有一个名字和一张照片。添加一个方法,显示,为学生显示图片
- reactjs - HOC 中传递的组件类型是什么?
- c++ - 为什么调试器将表达式评估为假,而它的实际值为真?