首页 > 解决方案 > 在 Confluent conn 失败时通过 HTTPS 的 Kafka Schema Registry

问题描述

由于看起来像是身份验证问题,我无法在模式注册表中注册 Avro 模式。

我已经设置了一个 Confluent Cloud 集群并通过 UI 为一个主题定义了一个 avro 模式。我还通过 UI 设置了 Api 键。

我已经验证我可以查询subjects使用以下 curl - curl -u keyid:secretkey https://schema-reg-url/subjects。所以我使用的 API 密钥应该是好的。

我也尝试RestService使用正确的属性设置 a(如下),但我似乎仍然无法连接到模式注册表。

我查看了源代码SchemaRegistryClient,但似乎没有指定身份验证参数的选项。

我在这里走错了吗?

注意:我指定了以下属性,因为这些是 Confluent API 访问页面中建议的。

val rs1: RestService = new RestService("<https://schema-registry-url>")
val props = new util.HashMap[String, String]()
props.put("basic.auth.credentials.source", "USER_INFO")
props.put("schema.registry.basic.auth.user.info", "key_id:secret_key_id")
props.put("schema.registry.url", "https://schema-registry-url")

// this fails
rs1.registerSchema(props, RegisterSchemaRequest.fromJson(schemaString), "<subject name>")

// this fails as well
val listOfSubjects: util.List[String] = rs1.getAllSubjects(props)

我得到的错误如下。

Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005

更新:我做了进一步的分析。
上面的错误不是实际的错误 - 发生上面的错误是因为调用jsonDeserializer.readValue()失败是因为没有将Exception对象传递给它(请参阅:源代码中的行

实际错误是401 HTTP_UNAUTHORIZED错误。

与 Schema Registry 的连接使用 Basic Auth 进行授权。REST GET 调用需要在标头中包含编码的 API Key:Pwd。

下面的答案中的工作代码片段。

标签: scalaapache-kafkaconfluent-schema-registry

解决方案


我从 Confluent 支持那里得到了一些帮助 - Schema Registry 使用 Basic Auth 进行授权,并且 API Keys+Pwd 需要在 Header 中作为"Authorization":"Basic base64encoded(api-key-username:pwd)".

我想我需要通过授权/身份验证约定来研究我的知识。

工作代码片段对我有用。

val rs1: RestService = new RestService(s"${testKafkaSchemaRegistryURL}")
val headers = new util.HashMap[String, String]()
  headers.put("Authorization","Basic " + util.Base64.getEncoder().encodeToString(s"${testKafkaSchemaRegistryAccessKey}:${testKafkaSchemaRegistrySecretAccessKey}".getBytes()))

// works now
val allSubjects = rs1.getAllSubjects(headers)

// works now
val schConfig: Schema = rs1.getLatestVersion(headers, "kev-test-1-value")
schContent.toString().parseJson

推荐阅读