首页 > 解决方案 > Confluent Replicator 错误:无法转换架构注册表记录 (io.confluent.connect.replicator.schemas.SchemaTranslator:188)

问题描述

我正在尝试使用教程中指定的Replicator 将模式注册表从本地集群迁移到 Confluent Cloud 。它设法复制了除少数之外的所有主题,我不知道为什么......它吐出以下错误:

ERROR Failed to translate schema registry record (io.confluent.connect.replicator.schemas.SchemaTranslator:188)
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while registering schema; error code: 500; error code: 500
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:481)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:212)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:267)
    at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchema(SchemaTranslator.java:211)
    at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchemas(SchemaTranslator.java:166)
    at io.confluent.connect.replicator.schemas.SchemaTranslator.translateCollectedRecords(SchemaTranslator.java:135)
    at io.confluent.connect.replicator.ReplicatorSourceTask.translateCollectedRecords(ReplicatorSourceTask.java:566)
    at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:558)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

它无法复制的模式看起来与它设法复制的模式没有任何不同。

我该如何调试它?

我正在使用 Confluent 平台 6.0.0。

connect-standalone.properties


#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Store offsets on local filesystem
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
security.protocol=SASL_SSL

consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
consumer.security.protocol=SASL_SSL
consumer.offset.start=earliest

producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
# x4
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
producer.security.protocol=SASL_SSL

config.storage.replication.factor=1
offset.storage.replication.factor=1
istatus.storage.replication.factor=1

quickstart-replicator.properties

# basic connector configuration
name=replicator-v28
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector

key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter

tasks.max=4

# source cluster connection info
src.kafka.bootstrap.servers=localhost:9092

# destination cluster connection info
dest.kafka.ssl.endpoint.identification.algorithm=https
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.request.timeout.ms=20000
dest.kafka.bootstrap.servers=<kafka-cloud-url>:9092
retry.backoff.ms=500
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
dest.kafka.security.protocol=SASL_SSL

# Schema Registry migration topics to replicate from source to destination
# topic.whitelist indicates which topics are of interest to replicator
topic.whitelist=_schemas
# schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas
schema.registry.topic=_schemas

# Connection settings for destination Confluent Cloud Schema Registry
schema.registry.url=https://<cloud-schema-registry-url>
schema.registry.client.basic.auth.credentials.source=USER_INFO
schema.registry.client.basic.auth.user.info=****:****
confluent.topic.replication.factor=3

标签: apache-kafkaconfluent-platformconfluent-schema-registry

解决方案


如日志所述,问题在于客户端无法在模式注册表中发布主题。

调试它的方法是debug为 Replicator 设置日志记录级别。例如,您可以更改~/confluent-6.0.0/etc/kafka/connect-log4j.properties以下行:

log4j.rootLogger=DEBUG, stdout, connectAppender

您需要重新启动复制器并将日志写入文件,如下所示:

./connect-standalone.sh ../etc/kafka/connect-standalone.properties ../etc/kafka-connect-replicator/quickstart-replicator.properties | tee ../logs/debug.txt

在 中debug.txt,您可以找到无法在架构注册表中发布的主题并尝试手动进行。

最有可能发生这种情况是因为目标模式注册表已经包含具有相同 ID 的主题。不,如果您从模式中手动删除主题,它仍然存在于模式注册表中,只是不可见。要从模式注册表中完全删除主题,您需要对主题进行“硬删除”。


推荐阅读