apache-spark - 使用来自 pyspark 2.4.4 的火花流
问题描述
我在 k8s 容器中设置了 spark 2.4.4 版本。我正在尝试编写一个简单的 hello world 来使用这样的火花流:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
spark = SparkSession.builder.appName("pyspark-kafka").getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
kafkaStream = KafkaUtils.createDirectStream(ssc, ['users-update'], {"metadata.broker.list":'pubsub-0.pubsub:9092,pubsub-1.pubsub:9092,pubsub-2.pubsub:9092'})
请注意,pubsub-x.pubsub 是我的容器可见的 kafka 代理。(在我的最后一行 pyspark 代码中,一个简单的 python 程序直接使用带有代理和主题的 kafka-python 客户端工作得很好。)
我收到此错误消息:
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.4 ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.4.
Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...
________________________________________________________________________________________________
maven 上的任何地方都没有 2.4.4 版本的 kafka 库。https://search.maven.org/search?q=spark%20kafka显示最后发布的 jar 是 2.10 或 2.11 版本。
我的 pyspark 安装中确实有一个 spark-streaming_2.12-2.4.4.jar jar,但它似乎没有正确的 kafka 类。
感谢您的任何指点!——斯里达尔
解决方案
Spark v2.4.4 是使用 scala v2.11 预构建的。从火花下载页面:
请注意,Spark 是使用 Scala 2.11 预构建的,但版本 2.4.2 是使用 Scala 2.12 预构建的。
所以,基本上2.10
和2.11
是构建 spark 的 scala 版本,你应该下载 spark-streaming-kafka jar,它是在你的情况下使用相同版本的 scala 构建的2.11
。
我已经检查了 spark 2.4.4 中的 jars 文件夹,并且spark-streaming_2.11-2.4.4.jar
jar 存在于那里。因此,如果您已将其添加到外部类路径中,则应该删除spark-streaming_2.12-2.4.4.jar
,否则您将获得版本不匹配。
你可以spark-streaming-kafka-0-8-assembly.jar
从这里下载
我认为你还需要从这里kafka-clients
添加jar 。
推荐阅读
- airflow - 使用环境变量在 Airflow 中设置 Google Cloud 连接
- reactjs - React Select 与 Custom MultiValueContainer 和 Apollo Mutation 不合作
- spring-cloud - Spring Cloud Gateway——通过调度刷新路由
- typescript - TypeScript:多维数组参数的正确方法
- visual-studio - 不出现调试图标
- azure-log-analytics - Log Analytics - 定价层与订阅计费模型不匹配
- maven - 未能执行目标 org.apache.maven.plugin
- ios - Xcode 10 转换为 Swift 4.2 失败
- c# - 如何在c#中获取对象
- javascript - React Native TextInput占位符在iOS中导致错误的contentSize.height onContentSizeChange