scala - 如何使用 Spark 结构化流连接到受 Kerberos 保护的 Kafka 集群?
问题描述
我正在尝试使用结构化流 API 连接到受 Kerberos 保护的 Kafka 集群。下面是我的代码和 Spark 的输出。我没有看到任何异常,只是警告客户端断开连接的消息。解决此问题的下一步是什么?
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Logger, Level}
object Main {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local[*]")
.appName("myapp")
.config("spark.executor.extraJavaOptions", "java.security.auth.login.config=jaas.conf")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
.option("security.protocol", "SASL_PLAINTEXT")
.option("sasl.kerberos.service.name", "mysvcname")
.option("subscribe", "mytopic")
.load()
val query = lines.select("value").writeStream.format("console").start()
query.awaitTermination()
}
这是输出:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/02/11 17:15:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/02/11 17:15:10 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
...
解决方案
我发现了我的问题。指定安全协议选项时,选项名称必须以“kafka.”为前缀。这很令人困惑,因为对于普通的 Kafka 消费者来说,该选项只是security.protocol,但为了配置 Spark,bootstrap.servers和security.protocol(以及您可能需要的任何其他选项/属性)都必须以kafka 为前缀。. 我的原始代码是:
.option("security.protocol", "SASL_PLAINTEXT")
正确的选择是:
.option("kafka.security.protocol", "SASL_PLAINTEXT")
这是有效的完整代码:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
object Main {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.INFO)
Logger.getLogger("akka").setLevel(Level.INFO)
val spark = SparkSession.builder()
.master("local[*]")
.appName("myapp")
.config("spark.executor.extraJavaOptions", "java.security.auth.login.config=c:/krb/jaas.conf")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("subscribe", "mytopic")
.load()
val query = lines.select("value").writeStream.format("console").start()
query.awaitTermination()
}
}
作为参考,这里是 jaas.conf 文件的内容:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="c:/krb/mykeytab.keytab"
principal="myaccount@mydomain.int"
storeKey=true
useTicketCache=false
serviceName="myservicename";
};
推荐阅读
- c# - 有没有一种方法/方式来检索单元格值的文本颜色?
- python - sklearn 具有多个元素的数组的真值是模棱两可的。使用 a.any() 或 a.all() 错误
- firebase - Firebase FCM 中的下游消息是否有限制?
- asp.net - 将 Asp.Net SelectList 绑定到参数
- android - 我最近更新了我的 Android Studio,现在我无法在 Android Studio 中创建新的活动/片段
- r - 查找(分组时)总和为某个值的日期的所有行索引
- symfony - 无法使用路由
- r - R相当于SAS“合并”“按”
- c# - C#通过反射按子属性排序父对象列表
- r - 哪个()不匹配给定的行到数据框的行