apache-spark - Pyspark 使用 kafka 进行结构化流式处理
问题描述
我正在尝试运行从 Kafka 读取的 spark 结构化流代码。在我的结构化流中,我正在按操作进行分组并尝试打印这样的值-
spark = SparkSession.builder.appName(APP_NAME). \
master("local[*]"). \
config("spark.executor.extraJavaOptions",
"java.security.auth.login.config=kafka-jaas.conf"). \
getOrCreate()
log_ds_stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"rs3.internal.optimusride.com:6667"). \
option("subscribe", "all-datasets"). \
option("kafka.security.protocol", "SASL_PLAINTEXT"). \
option("startingOffsets", "earliest").load()
log_ds_stream = log_ds_stream.selectExpr("CAST(value AS STRING)")
log_ds_stream.printSchema()
log_count = log_ds_stream.groupBy("value").count()
query = log_count.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
但我收到以下错误 -
spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream( StreamExecution.scala:265) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Traceback(最近一次调用):文件“/usr/lib/spark/ python/lib/pyspark.zip/pyspark/sql/utils.py”,第 63 行,在装饰文件“/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py ",第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o61.awaitTermination 时发生错误。: org.apache.spark.sql.streaming.StreamingQueryException: null === 流式查询 === 标识符: [id = a715ee42-f180-4730-81cf-956db3a866b2,
当前状态:初始化线程状态:在 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) 在 org.apache 处可运行.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) 原因:org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala: 99) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)。 spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369) 在 org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58) 在 org.apache.spark.sql.kafka010。KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369) at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver( KafkaSourceProvider.scala:394) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90) 在 org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:255)在 org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88) 在 org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun $1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79) at org.apache.spark。sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83) at org. apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:256) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:256 ) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:255) 在 org .apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql .catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala: 29)在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:261) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown $1.apply(TreeNode.scala:261) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( TreeNode.scala:261) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan。org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper. scala:149) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan. scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:261) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:261) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) at org.apache.spark.sql.catalyst。 org.apache.spark.sql.catalyst.trees 上的trees.TreeNode.mapProductIterator(TreeNode.scala:187)。TreeNode.mapChildren(TreeNode.scala:324) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan。 org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper. scala:149) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan. scala:29) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:245) 在 org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala: 83)在 org.apache.spark.sql.execution。streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265) ... 1 更多
在处理上述异常的过程中,又出现了一个异常:
回溯(最后一次调用):文件“/home/job/sample.py”,第 98 行,在 read_msg_sstream() 文件“/home/job/sample.py”,第 92 行,在 read_msg_sstream query.awaitTermination()
我无法理解导致此错误的原因。任何帮助将不胜感激。
解决方案
推荐阅读
- shebang - 从今天起如何将多个参数传递给shebang
- python - 在 django 中创建新帖子时自动发送电子邮件
- flutter - 如何将真实像素值转换为抖动逻辑像素(密度无关像素)
- css - 将 CSS Grid 命名区域与行号和列号混合不适合我
- python - 请帮我解决这个基本的python程序
- r - 如何在 Rstudio 中拆分特殊字符“[”}
- oauth - 如何为 Snapchat API 传递 code_challenge 和 code_verifier
- c++ - 我可以在 C++ 中使用非 ASCII 字符创建变量名吗?
- javascript - VueJS 使用了对象分配,但该对象中的数组仍然跟踪更改
- linux - PAM http 模块允许使用 http 请求登录