scala - Kafka-Spark Batch Streaming:WARN clients.NetworkClient:Bootstrap broker 断开连接
问题描述
我正在尝试将 a 的行写入Dataframe
a Kafka topic
。kafka 集群是 Kerberized,我在 --conf 参数中提供了 jaas.conf,以便能够验证并连接到集群。下面是我的代码:
object app {
val conf = new SparkConf().setAppName("Kerberos kafka ")
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
System.setProperty("java.security.auth.login.config", "path to jaas.conf")
spark.sparkContext.setLogLevel("ERROR")
def main(args: Array[String]): Unit = {
val test= spark.sql("select * from testing.test")
test.show()
println("publishing to kafka...")
val test_final = test.selectExpr("cast(to_json(struct(*)) as string) AS value")
test_final .show()
test_final.write.format("kafka")
.option("kafka.bootstrap.servers","XXXXXXXXX:9093")
.option("topic", "test")
.option("security.protocol", "SASL_SSL")
.option("sasl.kerberos.service.name","kafka")
.save()
}
}
当我运行上面的代码时,它失败并出现以下错误:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
当我查看执行程序的错误日志时,我看到了:
18/08/20 22:06:05 INFO producer.ProducerConfig: ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [xxxxxxxxx:9093]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
**security.protocol = PLAINTEXT**
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
18/08/20 22:06:05 **INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2**
18/08/20 22:06:05 INFO utils.AppInfoParser: Kafka commitId : unknown
18/08/20 22:06:05 INFO datasources.FileScanRDD: Reading File path: hdfs://nameservice1/user/test5/dt=2017-08-04/5a8bb121-3cab-4bed-a32b-9d0fae4a4e8b.parquet, range: 0-142192, partition values: [2017-08-04]
18/08/20 22:06:05 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 4
18/08/20 22:06:05 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 33.9 KB, free 5.2 GB)
18/08/20 22:06:05 INFO broadcast.TorrentBroadcast: Reading broadcast variable 4 took 224 ms
18/08/20 22:06:05 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 472.2 KB, free 5.2 GB)
18/08/20 22:06:06 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:06 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:07 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:07 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:07 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:08 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4
18/08/20 22:06:08 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 4)
18/08/20 22:06:08 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:08 INFO datasources.FileScanRDD: Reading File path: hdfs://nameservice1/user/test5/dt=2017-08-10/2175e5d9-e969-41e9-8aa2-f329b5df06bf.parquet, range: 0-77484, partition values: [2017-08-10]
18/08/20 22:06:08 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:09 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:09 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:10 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
18/08/20 22:06:10 WARN clients.NetworkClient: Bootstrap broker xxxxxxxxx:9093:9093 disconnected
在上面的日志中,我看到三个冲突的条目:
security.protocol = PLAINTEXT
sasl.kerberos.service.name = null
INFO utils.AppInfoParser:Kafka 版本:0.9.0-kafka-2.0.2
我在我security.protocol
的. 这是否意味着配置没有通过?我在 jar 中使用的 Kafka 依赖项是:sasl.kerberos.service.name
test_final.write....
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
版本是否与0.10.2.1
冲突0.9.0-kafka-2.0.2
?这会导致问题吗?
这是我的 jaas.conf:
/* $Id$ */
kinit {
com.sun.security.auth.module.Krb5LoginModule required;
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
useKeyTab=true
principal="user@CORP.COM"
serviceName="kafka"
keyTab="/data/home/keytabs/user.keytab"
client=true;
};
这是我的 spark-submit 命令:
spark2-submit --master yarn --class app --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=path to jaas.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=path to jaas.conf" --files path to jaas.conf --conf "spark.driver.extraClassPath=path to spark-sql-kafka-0-10_2.11-2.2.0.jar" --conf "spark.executor.extraClassPath=path to spark-sql-kafka-0-10_2.11-2.2.0.jar" --num-executors 2 --executor-cores 4 --executor-memory 10g --driver-memory 5g ./KerberizedKafkaConnect-1.0-SNAPSHOT-shaded.jar
任何帮助,将不胜感激。谢谢!
解决方案
我不完全确定这是否会解决您的特定问题,但在spark 结构化流式传输中,安全选项必须以kafka.
因此,您将拥有以下内容:
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SslConfigs
val security = Map(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> security.securityProtocol,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> security.sslTrustStoreLocation,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> security.sslTrustStorePassword,
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> security.sslKeyStoreLocation,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> security.sslKeyStorePassword,
SslConfigs.SSL_KEY_PASSWORD_CONFIG -> security.sslKeyPassword
).map(x => "kafka." + x._1 -> x._2)
test_final.write.format("kafka")
.option("kafka.bootstrap.servers","XXXXXXXXX:9093")
.option("topic", "test")
.options(security)
.save()
推荐阅读
- python - 如何使用 seaborn 中的所有功能绘制 ridgeplot?
- r - 在 R Shiny daterangeinput 中更改日期格式
- vue.js - 如何将属性从父级传递给子级并返回给父级和子级?
- git - 忽略 git merge 上的一些文件
- google-sheets - 如何在 Google 表格中使用 ARRAYFORMULA(或类似)加入多行和多列数据?
- python - 遍历页面中的父类,并在 python 中使用 selenium 确定父类是否存在子类
- python - 如何将 NaN 分配给张量元素?
- c# - .net 核心网络应用程序在本地运行得很好,但将代码克隆到另一台计算机会导致错误
- python - 如何将循环数据输出到嵌套列表中?
- nginx - 访问 nginx 流代理没有记录在 access_log 文件中