scala - Spark Streaming Kafka 超时
问题描述
我正在尝试使用 spark-shell 的简单示例在 Amazon EMR 上运行 Spark + Kafka 集成,但我不断收到超时错误。但是,当我使用org.apache.kafka
与以下相同的设置发布时,它可以正常工作。
超时错误:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
我移动client.truststore.jks
到client.keystore.p12
hdfs 并运行以下
$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col
val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
"kafka.security.protocol" -> "SSL",
"kafka.ssl.endpoint.identification.algorithm" -> "",
"kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
"kafka.ssl.truststore.password" -> "password",
"kafka.ssl.keystore.type" -> "PKCS12",
"kafka.ssl.key.password" -> "password",
"kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
"kafka.ssl.keystore.password" -> "password")
)
val df = spark
.read
.option("header", true)
.option("escape", "\"")
.csv("s3://bucket/file.csv")
val publishToKafkaDf = df.withColumn("value", col("body"))
publishToKafkaDf
.selectExpr( "CAST(value AS STRING)")
.write
.format("kafka")
.option("topic", "test-topic")
.options(kafkaOptions)
.save()
解决方案
已解决,这是工作节点的 AWS 安全组出站问题
推荐阅读
- c# - 从 C# .Net Core 项目*在 Linux* 上正确调用外部 C 库方法
- json - Neo4j 根据嵌套列表中的位置从 JSON 数据创建关系
- javascript - 我可以将 PHP 与 Javascript 结合使用吗?
- amazon-ec2 - AWS,我可以在 DVD 上获取发送给我的磁盘(卷)图像吗?
- c# - 如何让 Entity Framework with Oracle 在查询中向数据库发送小数秒?
- c++ - Qt 应用程序在小部件析构函数中崩溃
- php - url codeigniter中的多个变量
- c - Recvfrom 空缓冲区返回 errno EFAULT?
- java - 修复 java.lang.NoSuchMethodError: com.amazonaws.util.StringUtils.trim
- android - 获取附近 wifi 设备(不是路由器或接入点)的 wifi 信号强度?