mysql - 如何将mysql数据加载到druid中
问题描述
由于业务需要,我需要使用druid,但是druid不能加载mysql数据。所以首先使用confluent(kafka-connect-jdbc)加载mysql数据到kafka,最后加载数据到druid。演示网址:( https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/)Kafka 数据:
x@dataserv:~/confluent-4.1.1$ bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --from-beginning --topic mysql_foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1530778230000,"update_ts":1530778230000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1530778309000,"update_ts":1530778309000}
null {"c1":{"int":3},"c2":{"string":"foo1"},"create_ts":1530778675000,"update_ts":1530778675000}
然后使用naturality-distribution-0.8.2将kafka数据加载到druid中,但是失败了,多试几次修改json文件(最简单的方法,没有做任何聚合操作),还是没有成功。日志:
2018-07-05 08:18:04,679 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:05,427 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:05,441 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:05,555 [KafkaConsumer-0] INFO c.metamx.emitter.core.LoggingEmitter - Start: started [true]
2018-07-05 08:18:10,491 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:10,635 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=1, sentCount=0, droppedCount=0, unparseableCount=1}} pending messages in 5ms and committed offsets in 25ms.
2018-07-05 08:18:25,637 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
2018-07-05 08:18:40,639 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
2018-07-05 08:18:55,640 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 1ms.
2018-07-05 08:19:10,642 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
2018-07-05 08:19:25,644 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
你能帮我解决吗?我解决不了,学习两周。
版本:druid-0.12.1 json 文件:
{
"dataSources" : {
"kafka" : {
"spec" : {
"dataSchema" : {
"dataSource" : "kafka",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "update_ts",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions" : ["c1","c2","create_ts"],
"dimensionExclusions" : [ ]
},
"format" : "json"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "none"
},
"metricsSpec" : []
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT1M",
"windowPeriod" : "PT2M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "mysql_foobar"
}
}
},
"properties" : {
"zookeeper.connect" : "192.168.6.231:2181",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "192.168.6.231:2182",
"kafka.group.id" : "tranquility-kafka"
}
}
解决方案
推荐阅读
- python-3.x - 将 Conv2D 用于图像有什么问题?
- javascript - Nodejs javascript添加到按钮没有响应
- tensorflow - 神经网络在低密度区域的回归精度
- pyspark - 通过 pyspark 和 pycharm 进行 SQLite JDBC 连接的 jar 文件
- django - 如何在 Django 中使用 Vue 模板?
- mysql - 用于 AJAX 网页的 Linux Web 服务器配置高消耗 CPU
- api - Twitter can_dm 在 Web API 上返回 null
- python-3.x - 使用 Python 3 通过 WIFI 将相机从一个 Raspberry Pi 播放到另一个
- javascript - 子组件没有设置从父组件传来的初始值:ReactJS
- excel - 如何在删除最后一个字符的同时复制数组并以双精度获得结果