protocol-buffers - 使用 Apache Beam 在镶木地板中编写 protobuf 对象
问题描述
我从 google pub/sub 获取 protobuf 数据并将数据反序列化为 Message 类型对象。所以我得到PCollection<Message>
类型对象。这是示例代码:
public class ProcessPubsubMessage extends DoFn<PubsubMessage, Message> {
@ProcessElement
public void processElement(@Element PubsubMessage element, OutputReceiver<Message> receiver) {
byte[] payload = element.getPayload();
try {
Message message = Message.parseFrom(payload);
receiver.output(message);
} catch (InvalidProtocolBufferException e) {
LOG.error("Got exception while parsing message from pubsub. Exception =>" + e.getMessage());
}
}
}
PCollection<Message> event = psMessage.apply("Parsing data from pubsub message",
ParDo.of(new ProcessPubsubMessage()));
我想应用转换PCollection<Message> event
来以镶木地板格式书写。我知道 apache Beam 提供了ParquetIO,但它适用于PCollection<GenericRecord>
类型和从Message
to的转换GenericRecord
可能会解决问题(但不知道该怎么做)。有什么简单的方法可以用镶木地板格式写吗?
解决方案
它可以通过使用以下库来解决:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-protobuf</artifactId>
<version>1.7.7</version>
</dependency>
private GenericRecord getGenericRecord(Event event) throws IOException {
ProtobufDatumWriter<Event> datumWriter = new ProtobufDatumWriter<Event>(Event.class);
ByteArrayOutputStream os = new ByteArrayOutputStream();
Encoder e = EncoderFactory.get().binaryEncoder(os, null);
datumWriter.write(event, e);
e.flush();
ProtobufDatumReader<Event> datumReader = new ProtobufDatumReader<Event>(Event.class);
GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<GenericRecord>(datumReader.getSchema());
GenericRecord record = genericDatumReader.read(null, DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(os.toByteArray()), null));
return record;
}
详情:https ://gist.github.com/alexvictoor/1d3937f502c60318071f
推荐阅读
- python - 如何使用不同模型的字段自定义表单 django
- javascript - 使用 Hooks 在反应 onclick 中循环遍历数组
- linux - 写出 libjpeg 图像时内存泄漏
- com - MSCOMCTL.OCX 蓝调
- ms-access - MS Access Openform - 所需的空白字段值
- r - R中跨年份的时间汇总
- javascript - 如何从 React Native 中的三个不同选项有条件地渲染?
- python - 如何使用 AWS Lamda@Edge 将 node.js 代码转换为 python
- android - 颤振重复的网页浏览小部件
- core-audio - Web Audio API audioctx.destination AudioNode 可以是虚拟音频设备吗?