java - 如何在 Kafka 中解析 Avro 数据
问题描述
我写了一个 Avro 模式到 Kafka 记录,比如
ProducerRecord<String, TestSchema> record = new ProducerRecord<String, TestSchema>("TestTopic0409", testSchema)
我想把它加载到德鲁伊。当我在本地启动 Druid 并连接到 Kafka 数据时,结果显示乱码:
[在此处输入图片描述][1]
然后我使用如下规格:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "new-data-source",
"timestampSpec": null,
"dimensionsSpec": null,
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
},
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
"namespace": "com.airebroker.data",
"name": "Test",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
}
]
}
},
"parseSpec": {
"format": "avro",
"timestampSpec": {},
"dimensionsSpec": {}
}
}
},
"ioConfig": {
"topic": "TestTopic0409",
"inputFormat": {
"type": "avro_ocf",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": []
},
"binaryAsString": false
},
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "TestTopic0409",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/home/zhangjh/apache-druid-0.20.2/var/tmp/druid-realtime-persist7289903804951562243",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}
然后它给了我一个结果:错误:未定义
我还以为是我的spec文件格式不对,但是我在官网上试了下Kafka中解析Avro数据的所有方法。全部返回我:错误:未定义。
然后我继续尝试手动解析Avro数据,通过扩展拼接成JSON数据。我定义了一个类:<code>public class ExampleByteBufferInputRowParser implements ByteBufferInputRowParser。在parseBatch
函数中我写了一个txt到tmp路径,但是当我解析数据时,方法没有通过这里。
解决方案
推荐阅读
- python-3.x - UnboundLocalError:分配前引用的局部变量“img_files”
- python-3.x - psycopg2.OperationalError:无法连接到服务器:连接被拒绝(0x0000274D/10061)
- javascript - 如何将数值数据导出显示输入到代码预览和文件中?
- python - 在python并行处理中,如何找到第一个进程调用的函数?
- php - 将数据保存在变量php中
- keras - 自定义指标中的 Keras to_categorical 给出错误
- react-native - 更改 DrawerNagivation react-native 中的活动菜单图标颜色
- cocoapods - 使用 Cocoapods 的 GoogleMobileVision GoogleSignIn 重复符号?
- c++ - 长有什么问题?为什么会自动减1?
- java - 我想将 Angular 6 前端连接到 java servlet