java - Flink 反序列化 Schema 问题
问题描述
我试图使用 FlinkKafkaConsumer 反序列化基于来自 flink 上游的 avro 模式的 json。传入的消息已成功反序列化为 JSONObject,但是当我尝试对 JSONObject 执行某些操作时,我收到错误消息,指出它不是有效的 JSONObject,因为它包含一些转义字符。
下面是上游系统使用的 Avro 序列化模式,基本上是一个 Spring 引导系统
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// No-op
}
@Override
public byte[] serialize(String topic, T data) {
try {
byte[] result = null;
if (data != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<T> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
}
return result;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
}
下面是 Flink kafka consumer 中的反序列化
public class AvroDeserialization<T> implements DeserializationSchema<T> {
private static final long serialVersionUID = 4330538776656642778L;
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserialization(Class<T> avroType) {
this.avroType = avroType;
}
@Override
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message, decoder);
return reader.read(null, decoder);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
private void ensureInitialized() {
if (reader == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
public void deserialize() {
}
}
作为一种解决方法,我将替换 json 字符串中的转义字符,如下所示。但这不会永久解决问题,请建议我如何解决问题。
String incomingData = incomingEvent.getData();
int i = incomingData.indexOf("{");
incomingData = incomingData.substring(i);
int p = incomingData.lastIndexOf("}");
incomingData = incomingData.substring(i, p + 1);
incomingData = incomingData.replaceAll("\\\\n\t", "\n");
jSONObject finalEvent = new JSONObject(incomingData.trim())
基本上这是目前为止的工作,但我不确定其他消息它是如何反应的。
解决方案
推荐阅读
- android - 如何从文件夹运行 ./gradlew 命令?
- c++ - 找不到 Visual Studio 2008 (Platform Toolset = 'v90') 的构建工具
- python - Python。构建exe
- c# - 如何删除数组中的所有匹配项并将剩余部分向左移动?
- react-native - 导航组件/屏幕是否必须卸载?
- javascript - 如何使用java脚本设置数据表超出第一页的行数据?
- c# - 如果 JSON 字符串具有未闭合的 html 标签,如何使用 AntiXss
- angular - Angular 7 性能
- javascript - 如何使用键值从 Firebase 检索数据
- c# - 将选定的下拉列表项显示到指定的 gridview 列中