首页 > 解决方案 > Pyspark 使用 kafka 进行结构化流式处理

问题描述

我正在尝试运行从 Kafka 读取的 spark 结构化流代码。在我的结构化流中,我正在按操作进行分组并尝试打印这样的值-

    spark = SparkSession.builder.appName(APP_NAME). \
    master("local[*]"). \
    config("spark.executor.extraJavaOptions",
           "java.security.auth.login.config=kafka-jaas.conf"). \
    getOrCreate()

log_ds_stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
                                                        "rs3.internal.optimusride.com:6667"). \
    option("subscribe", "all-datasets"). \
    option("kafka.security.protocol", "SASL_PLAINTEXT"). \
    option("startingOffsets", "earliest").load()

log_ds_stream = log_ds_stream.selectExpr("CAST(value AS STRING)")
log_ds_stream.printSchema()
log_count = log_ds_stream.groupBy("value").count()
query = log_count.writeStream.outputMode("complete").format("console").start()

query.awaitTermination()

但我收到以下错误 -

spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream( StreamExecution.scala:265) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Traceback(最近一次调用):文件“/usr/lib/spark/ python/lib/pyspark.zip/pyspark/sql/utils.py”,第 63 行,在装饰文件“/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py ",第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o61.awaitTermination 时发生错误。: org.apache.spark.sql.streaming.StreamingQueryException: null === 流式查询 === 标识符: [id = a715ee42-f180-4730-81cf-956db3a866b2,

当前状态:初始化线程状态:在 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) 在 org.apache 处可运行.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) 原因:org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala: 99) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)。 spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369) 在 org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58) 在 org.apache.spark.sql.kafka010。KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369) at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver( KafkaSourceProvider.scala:394) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90) 在 org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:255)在 org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88) 在 org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun $1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79) at org.apache.spark。sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83) at org. apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:256) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:256 ) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:255) 在 org .apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql .catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala: 29)在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:261) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown $1.apply(TreeNode.scala:261) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( TreeNode.scala:261) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan。org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper. scala:149) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan. scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:261) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:261) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) at org.apache.spark.sql.catalyst。 org.apache.spark.sql.catalyst.trees 上的trees.TreeNode.mapProductIterator(TreeNode.scala:187)。TreeNode.mapChildren(TreeNode.scala:324) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan。 org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper. scala:149) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan. scala:29) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:245) 在 org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala: 83)在 org.apache.spark.sql.execution。streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265) ... 1 更多

在处理上述异常的过程中,又出现了一个异常:

回溯(最后一次调用):文件“/home/job/sample.py”,第 98 行,在 read_msg_sstream() 文件“/home/job/sample.py”,第 92 行,在 read_msg_sstream query.awaitTermination()

我无法理解导致此错误的原因。任何帮助将不胜感激。

标签: apache-sparkpysparkspark-streamingspark-structured-streaming

解决方案


推荐阅读