apache-kafka - Confluent Replicator 错误:无法转换架构注册表记录 (io.confluent.connect.replicator.schemas.SchemaTranslator:188)
问题描述
我正在尝试使用教程中指定的Replicator 将模式注册表从本地集群迁移到 Confluent Cloud 。它设法复制了除少数之外的所有主题,我不知道为什么......它吐出以下错误:
ERROR Failed to translate schema registry record (io.confluent.connect.replicator.schemas.SchemaTranslator:188)
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while registering schema; error code: 500; error code: 500
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:481)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:212)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:267)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchema(SchemaTranslator.java:211)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchemas(SchemaTranslator.java:166)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateCollectedRecords(SchemaTranslator.java:135)
at io.confluent.connect.replicator.ReplicatorSourceTask.translateCollectedRecords(ReplicatorSourceTask.java:566)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:558)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
它无法复制的模式看起来与它设法复制的模式没有任何不同。
我该如何调试它?
我正在使用 Confluent 平台 6.0.0。
connect-standalone.properties
:
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Store offsets on local filesystem
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
consumer.security.protocol=SASL_SSL
consumer.offset.start=earliest
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
# x4
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
producer.security.protocol=SASL_SSL
config.storage.replication.factor=1
offset.storage.replication.factor=1
istatus.storage.replication.factor=1
quickstart-replicator.properties
:
# basic connector configuration
name=replicator-v28
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
tasks.max=4
# source cluster connection info
src.kafka.bootstrap.servers=localhost:9092
# destination cluster connection info
dest.kafka.ssl.endpoint.identification.algorithm=https
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.request.timeout.ms=20000
dest.kafka.bootstrap.servers=<kafka-cloud-url>:9092
retry.backoff.ms=500
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
dest.kafka.security.protocol=SASL_SSL
# Schema Registry migration topics to replicate from source to destination
# topic.whitelist indicates which topics are of interest to replicator
topic.whitelist=_schemas
# schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas
schema.registry.topic=_schemas
# Connection settings for destination Confluent Cloud Schema Registry
schema.registry.url=https://<cloud-schema-registry-url>
schema.registry.client.basic.auth.credentials.source=USER_INFO
schema.registry.client.basic.auth.user.info=****:****
confluent.topic.replication.factor=3
解决方案
如日志所述,问题在于客户端无法在模式注册表中发布主题。
调试它的方法是debug
为 Replicator 设置日志记录级别。例如,您可以更改~/confluent-6.0.0/etc/kafka/connect-log4j.properties
以下行:
log4j.rootLogger=DEBUG, stdout, connectAppender
您需要重新启动复制器并将日志写入文件,如下所示:
./connect-standalone.sh ../etc/kafka/connect-standalone.properties ../etc/kafka-connect-replicator/quickstart-replicator.properties | tee ../logs/debug.txt
在 中debug.txt
,您可以找到无法在架构注册表中发布的主题并尝试手动进行。
最有可能发生这种情况是因为目标模式注册表已经包含具有相同 ID 的主题。不,如果您从模式中手动删除主题,它仍然存在于模式注册表中,只是不可见。要从模式注册表中完全删除主题,您需要对主题进行“硬删除”。
推荐阅读
- html - 为什么只有两个类似命令之一会出现错误“NoneType”对象没有属性“内容”?
- php - Linux Centos 8 的 php 模块中缺少 oci8.so
- identityserver4 - 身份服务器 4:自定义令牌生成
- reactjs - React Native:如何在 FlatList 中播放视频
- python - 为什么 Python 版本的 Excel 的 Range.Resize 不能按预期工作?
- javascript - 为什么 Rust WASM 指针和 JS 指针的值不同?
- java - 转换错误 com.thoughtworks.xstream.converters.ConversionException
- php - Laravel:ErrorException(E_ERROR)在null上调用成员函数getAvatar()
- json - 使用 Clojure 将 json 对象上传到 S3?
- python - Tornado GET 套接字挂断