scala - 在 Confluent conn 失败时通过 HTTPS 的 Kafka Schema Registry
问题描述
由于看起来像是身份验证问题,我无法在模式注册表中注册 Avro 模式。
我已经设置了一个 Confluent Cloud 集群并通过 UI 为一个主题定义了一个 avro 模式。我还通过 UI 设置了 Api 键。
我已经验证我可以查询subjects
使用以下 curl -
curl -u keyid:secretkey https://schema-reg-url/subjects
。所以我使用的 API 密钥应该是好的。
我也尝试RestService
使用正确的属性设置 a(如下),但我似乎仍然无法连接到模式注册表。
我查看了源代码SchemaRegistryClient
,但似乎没有指定身份验证参数的选项。
我在这里走错了吗?
注意:我指定了以下属性,因为这些是 Confluent API 访问页面中建议的。
val rs1: RestService = new RestService("<https://schema-registry-url>")
val props = new util.HashMap[String, String]()
props.put("basic.auth.credentials.source", "USER_INFO")
props.put("schema.registry.basic.auth.user.info", "key_id:secret_key_id")
props.put("schema.registry.url", "https://schema-registry-url")
// this fails
rs1.registerSchema(props, RegisterSchemaRequest.fromJson(schemaString), "<subject name>")
// this fails as well
val listOfSubjects: util.List[String] = rs1.getAllSubjects(props)
我得到的错误如下。
Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005
更新:我做了进一步的分析。
上面的错误不是实际的错误 - 发生上面的错误是因为调用jsonDeserializer.readValue()
失败是因为没有将Exception
对象传递给它(请参阅:源代码中的行。
实际错误是401 HTTP_UNAUTHORIZED
错误。
与 Schema Registry 的连接使用 Basic Auth 进行授权。REST GET 调用需要在标头中包含编码的 API Key:Pwd。
下面的答案中的工作代码片段。
解决方案
我从 Confluent 支持那里得到了一些帮助 - Schema Registry 使用 Basic Auth 进行授权,并且 API Keys+Pwd 需要在 Header 中作为"Authorization":"Basic base64encoded(api-key-username:pwd)"
.
我想我需要通过授权/身份验证约定来研究我的知识。
工作代码片段对我有用。
val rs1: RestService = new RestService(s"${testKafkaSchemaRegistryURL}")
val headers = new util.HashMap[String, String]()
headers.put("Authorization","Basic " + util.Base64.getEncoder().encodeToString(s"${testKafkaSchemaRegistryAccessKey}:${testKafkaSchemaRegistrySecretAccessKey}".getBytes()))
// works now
val allSubjects = rs1.getAllSubjects(headers)
// works now
val schConfig: Schema = rs1.getLatestVersion(headers, "kev-test-1-value")
schContent.toString().parseJson