首页 > 解决方案 > 如何将自定义 avro 模式与 dockerized connect-datagen 一起使用?

问题描述

我正在尝试使用 dockerized connect-datagen 从自定义 avro 模式生成测试数据。

这是 Github 上的一个示例:https ://github.com/damc-dev/kafka-docker-datagen

我将目录映射到连接容器,并根据此处的说明配置为使用自定义模式:https ://github.com/confluentinc/kafka-connect-datagen/blob/master/README.md#define-a-new -模式规范

我创建了一个主题并创建了连接器,但它不会生成有关该主题的数据,并且连接日志中的错误消息是:

connect            | [2019-04-17 02:19:38,317] ERROR WorkerSourceTask{id=datagen-impressions-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect            | java.lang.NullPointerException
connect            |    at org.apache.avro.Schema.parse(Schema.java:1225)
connect            |    at org.apache.avro.Schema$Parser.parse(Schema.java:1032)
connect            |    at org.apache.avro.Schema$Parser.parse(Schema.java:1004)
connect            |    at io.confluent.avro.random.generator.Generator.<init>(Generator.java:218)
connect            |    at io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:120)
connect            |    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | [2019-04-17 02:19:38,319] ERROR WorkerSourceTask{id=datagen-impressions-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

标签: apache-kafkaavroapache-kafka-connectconfluent-platformconfluent-schema-registry

解决方案


推荐阅读