java - 如何将 protobuf 字节数组从 Flink 写入 Kafka
问题描述
我是 Flink 的新手。我要做的就是将我的 protobuf POJO 作为字节数组放入 kafka。所以我的FlinkKafkaProducer
样子是这样的:
FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream
.map(//here returns byte[])
.addSink(flinkKafkaProducer);
public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
}
现在它工作正常,但我的输出是字符串。我尝试添加TypeInformationSerializationSchema()
而不是new SimpleStringSchema()
更改输出,但我无法正确调整它。找不到教程。有人可以帮忙吗?
解决方案
所以,我终于弄清楚了如何将 protobuf 作为字节数组写入 kafka 生产者。问题在于序列化。在 POJO 的情况下,flink 使用 liberyKryo
进行自定义反序列化。编写 protobuf 的最佳方法是使用ProtobufSerializer.class
. 在这个例子中,我将从 kafka String 消息中读取并写入字节数组。
摇篮依赖:
compile (group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'){
exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
}
implementation 'com.google.protobuf:protobuf-java:3.11.0'
登记:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);
KafkaSerializer类
@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema<MyProto> {
private final String topic;
private final byte[] key;
@Override
public ProducerRecord<byte[], byte[]> serialize(MyProto element, Long timestamp) {
return new ProducerRecord<>(topic, key, element.toByteArray());
}
}
工作
public static FlinkKafkaProducer<MyProto> createProtoProducer(String topic, String kafkaAddress) {
MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", consumerGroup);
return new FlinkKafkaProducer<>(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
public static FlinkKafkaConsumer<String> createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
}
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer<MyProto> flinkKafkaProducer = createProtoProducer(outputTopic, address);
stringInputStream
.map(hashtagMapFunction)
.addSink(flinkKafkaProducer);
environment.execute("My test job");
资料来源:
推荐阅读
- java - 使代码更具可读性,迭代内部映射
- javascript - 从子组件导航时刷新父组件
- ios - 更改扩展名后base64字符串未打开视频文件
- java - 图库按钮打开相机和图库
- c# - 是否可以在不使用 [JsonIgnore] 的情况下忽略 nswag 中的模型属性?
- sql-server - 组件“Fuzzy Grouping”(40)的版本与该版本的DataFlow不兼容
- python - 使用 Pandas 访问 csv 文件时出现 KeyError
- python - 为 3 通道图像中的每个通道添加相同的值?
- google-app-engine - Google Cloud Datatore Go Client 默认不关闭连接?
- thymeleaf - 如何在百里香中打破循环