scala - 使用经过身份验证的 Confluent Schema Registry 配置 Spark Structured Streaming
问题描述
我在 Spark Streaming 中使用 Kafka Source 来接收使用 Confluent Cloud 中的 Datagen 生成的记录。我打算使用 Confluent Schema Registry,
目前,这是我面临的例外:*
线程“主”io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException 中的异常:未授权;错误代码:401
融合云的架构注册表需要传递一些我不知道如何输入的身份验证数据:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret
我想我必须将此身份验证数据传递给 CachedSchemaRegistryClient 但我不确定是否如此以及如何。
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
如果我尝试将身份验证发送到架构注册表
val restService = new RestService(schemaRegistryURL)
val props = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> "secret:secret"
).asJava
var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
我明白
Cannot resolve overloaded constructor CachedSchemaRegistryClient
了,似乎只有 2 个参数要发送到 CachedSchemaRegistryClient。
我该如何解决?
我遇到了这篇文章,但在这里他们没有对融合云中的模式注册表应用任何身份验证。
解决方案
这段代码对我有用:
private val schemaRegistryUrl = "<schemaregistryURL>"
val props = Map("basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> "<api-key>:<api-secret>").asJava
private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props)
我们需要确保在转换为 JAVA 时进行正确的导入:
import scala.collection.JavaConverters.mapAsJavaMapConverter
推荐阅读
- typescript - TypeScript 中有没有一种方法可以在不丢失对象特异性的情况下使用接口作为保护?
- c++ - 如何从精明的图像中去除高频区域?
- html - 如何使具有响应式设计的嵌套表与 Gmail 兼容
- javascript - 更新状态反应
- python - 如何使用 Python 分析 docx 文件中的注释语句?
- html - 角垫形式场不会扩大
- javascript - html-docx-js 在创建 docx 文件时不能应用外部 css 类样式
- python - 为 Teradata DB 表格式化 Python 时间戳
- azure-sql-data-warehouse - ALTER TABLE SWITCH 语句失败。表 X 中的分区 1 定义的范围不是表 Y 中的分区 2 定义的范围的子集
- python - 向 Scrapy Spider 添加标题