首页 > 解决方案 > 使用经过身份验证的 Confluent Schema Registry 配置 Spark Structured Streaming

问题描述

我在 Spark Streaming 中使用 Kafka Source 来接收使用 Confluent Cloud 中的 Datagen 生成的记录。我打算使用 Confluent Schema Registry,

目前,这是我面临的例外:*

线程“主”io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException 中的异常:未授权;错误代码:401

融合云的架构注册表需要传递一些我不知道如何输入的身份验证数据:

basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret

我想我必须将此身份验证数据传递给 CachedSchemaRegistryClient 但我不确定是否如此以及如何。

// Setup the Avro deserialization UDF
   schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)

如果我尝试将身份验证发送到架构注册表

val restService = new RestService(schemaRegistryURL)

  val props = Map(
    "basic.auth.credentials.source" -> "USER_INFO",
    "schema.registry.basic.auth.user.info" -> "secret:secret"
  ).asJava

  var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)

我明白 Cannot resolve overloaded constructor CachedSchemaRegistryClient了,似乎只有 2 个参数要发送到 CachedSchemaRegistryClient。

我该如何解决?

我遇到了这篇文章,但在这里他们没有对融合云中的模式注册表应用任何身份验证。

标签: scalaapache-sparkapache-kafkaconfluent-schema-registryconfluent-cloud

解决方案


这段代码对我有用:

private val schemaRegistryUrl = "<schemaregistryURL>"   
val props = Map("basic.auth.credentials.source" -> "USER_INFO",
 "schema.registry.basic.auth.user.info" -> "<api-key>:<api-secret>").asJava

 private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props)

我们需要确保在转换为 JAVA 时进行正确的导入:

 import scala.collection.JavaConverters.mapAsJavaMapConverter

推荐阅读