java - kafka 流处理器 Api 反序列化 avro 记录
问题描述
我正在流式传输来自 Kafka 的实时数据。但是数据是 Avro 格式的。无法反序列化为 Json。我正在使用 Kafka Stream 低级处理器 API。如何反序列化 Avro 记录?
def orderStreamData(builder: KStreamBuilder, inTopic: String, outTopic: String): TopologyBuilder = {
builder
.addSource("source1", stringDe, stringDe, inTopic)//adding source topic
//now adding processor class using ProcessSupplier
.addProcessor("order", new ProcessorSupplier[String, String] {
override def get(): Processor[String, String] = new ProcessorImpl
}, "source1")
//adding local state store for stateful operations
.addStateStore(Stores.create("tester").withStringKeys.withStringValues.inMemory.build, "order")
//adding destination topic for the processed data to go
.addSink("sink", outTopic, stringSer, stringSer, "order")
}
class ProcessorImpl extends AbstractProcessor[String, String]{
var keyValueStore: KeyValueStore[String, String] = _
var processorContext: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
processorContext = context
processorContext.schedule(10000L)
keyValueStore = processorContext.getStateStore("tester").asInstanceOf[KeyValueStore[String, String]]
Objects.requireNonNull(keyValueStore, "State Store can't be null")
}
/**
* here logic is implemented
* every value for a key must be greater than the previous value
* */
override def process(key: String, value: String): Unit = {
//accessing local state store for last value saved for this key
}
}
解决方案
@Override
public T deserialize(String inTopic, byte[] data) {
try {
T result = null; // Intialize result to null
if (data != null) {
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
LOGGER.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException("can not deserialize"+data+"exception"+ex);
}
}
推荐阅读
- java - 在 SharedPreference 中保存应用程序范围的布尔值
- odoo-8 - 清理 Many2one 字段
- mysql - dplyr 使用变音符号加入 Mysql 数据
- android - Web Speech API 适用于 Chrome,但在 ionic 应用程序中不适用。有什么区别以及哪些更改可以使其在 ionic 1 应用程序上运行?
- angular - 在 Angular v2-6 中内置 forEach 实用程序,就像在 AnularJS 中一样
- palindrome - 这个回文函数的复杂度是多少?
- python - 如何使用返回 numpy 1D 数组的函数填充 numpy 2D 数组
- python - 列的简单条形图意味着使用 seaborn
- vbscript - 使用 VBScript 的递归子内存泄漏
- python - 如何从python中的元组列表中提取等于值的第n个元素