首页 > 解决方案 > 如何在 Kafka 中解析 A​​vro 数据

问题描述

我写了一个 Avro 模式到 Kafka 记录,比如

ProducerRecord<String, TestSchema> record = new ProducerRecord<String, TestSchema>("TestTopic0409", testSchema)

我想把它加载到德鲁伊。当我在本地启动 Druid 并连接到 Kafka 数据时,结果显示乱码:

[在此处输入图片描述][1]

然后我使用如下规格:

{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "new-data-source",
"timestampSpec": null,
"dimensionsSpec": null,
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
},
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
"namespace": "com.airebroker.data",
"name": "Test",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
}
]
}
},
"parseSpec": {
"format": "avro",
"timestampSpec": {},
"dimensionsSpec": {}
}
}
},
"ioConfig": {
"topic": "TestTopic0409",
"inputFormat": {
"type": "avro_ocf",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": []
},
"binaryAsString": false
},
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "TestTopic0409",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/home/zhangjh/apache-druid-0.20.2/var/tmp/druid-realtime-persist7289903804951562243",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}

然后它给了我一个结果:错误:未定义

我还以为是我的spec文件格式不对,但是我在官网上试了下Kafka中解析Avro数据的所有方法。全部返回我:错误:未定义。

然后我继续尝试手动解析Avro数据,通过扩展拼接成JSON数据。我定义了一个类:<code>public class ExampleByteBufferInputRowParser implements ByteBufferInputRowParser。在parseBatch函数中我写了一个txt到tmp路径,但是当我解析数据时,方法没有通过这里。

标签: javadruid

解决方案


推荐阅读