首页 > 解决方案 > PySpark:将 Kafka 主题写入控制台失败

问题描述

正在从 Kafka 主题获取消息并将其写入控制台。阅读消息不是问题,能够阅读消息并打印架构。但是当我试图将它写入控制台时,它失败了。任何建议都会有所帮助。

下面是我的代码,

from pyspark.sql import SparkSession


spark = SparkSession.builder\
  .appName("Kafka Spark")\
  .config("spark.jars", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0- 10_2.12-3.0.0-preview2.jar")\
  .config("spark.executor.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.executor.extraLibrary", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.driver.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .getOrCreate()


dataFrameRead = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "Jim_Topic")\
    .load()

dataFrameRead.printSchema()

dataFrameConsole = dataFrameRead.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream.outputMode(outputMode='Append').format('console').start()

dataFrameConsole.awaitTermination()

下面是错误,



C:\Users\Macaulay\AppData\Local\Programs\Python\Python38-32\python.exe C:/Users/Macaulay/PycharmProjects/Spark/KafkaSpark/KafkaTopic2CSV.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
+---+-----+

20/06/14 12:19:18 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:607)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
    at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:51)
    at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:39)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:53)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2092/833428682.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 22 more
20/06/14 12:19:18 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
20/06/14 12:19:18 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@4ad854a4 is aborting.
20/06/14 12:19:18 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@4ad854a4 aborted.
20/06/14 12:19:18 ERROR MicroBatchExecution: Query [id = 39f99b61-806b-45da-b717-09a635405011, runId = 1022045a-8a75-4b25-ad41-328cbedcbfc4] terminated with error
org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:407)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:355)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:318)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:325)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:173)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$1634/675684139.apply(Unknown Source)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$1658/109672489.apply(Unknown Source)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:313)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:362)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2812)
    at org.apache.spark.sql.Dataset$$Lambda$1614/1023696801.apply(Unknown Source)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
    at org.apache.spark.sql.Dataset$$Lambda$1615/1752721246.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1598/813934425.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:556)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1591/1450070714.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1598/813934425.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:551)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1590/1332436953.apply(Unknown Source)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:341)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:339)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:551)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:212)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1250/619118163.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:341)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:339)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:180)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1248/527271024.apply$mcZ$sp(Unknown Source)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:174)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, BLR-MOB0161.sst.stp, executor driver): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:607)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
    at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:51)
    at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:39)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:53)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2092/833428682.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 22 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
    at org.apache.spark.scheduler.DAGScheduler$$Lambda$2344/77762193.apply(Unknown Source)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGScheduler$$Lambda$2342/1283673514.apply(Unknown Source)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:376)
    ... 47 more
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:607)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
    at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:51)
    at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:39)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:53)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2092/833428682.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 22 more
Traceback (most recent call last):
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 98, in deco
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o55.awaitTermination.
: org.apache.spark.sql.streaming.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 39f99b61-806b-45da-b717-09a635405011, runId = 1022045a-8a75-4b25-ad41-328cbedcbfc4]
Current Committed Offsets: {KafkaV2[Subscribe[Jim_Topic]]: {"Jim_Topic":{"2":582,"1":264,"0":470}}}
Current Available Offsets: {KafkaV2[Subscribe[Jim_Topic]]: {"Jim_Topic":{"2":583,"1":264,"0":470}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]
   +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1b90a9ca, KafkaV2[Subscribe[Jim_Topic]]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:407)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:355)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:318)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:325)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:173)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$1634/675684139.apply(Unknown Source)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$1658/109672489.apply(Unknown Source)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:313)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:362)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2812)
    at org.apache.spark.sql.Dataset$$Lambda$1614/1023696801.apply(Unknown Source)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
    at org.apache.spark.sql.Dataset$$Lambda$1615/1752721246.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1598/813934425.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:556)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1591/1450070714.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1598/813934425.apply(Unknown Source)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:551)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1590/1332436953.apply(Unknown Source)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:341)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:339)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:551)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:212)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1250/619118163.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:341)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:339)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:180)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1248/527271024.apply$mcZ$sp(Unknown Source)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:174)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
    ... 1 more

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:/Users/Macaulay/PycharmProjects/Spark/KafkaSpark/KafkaTopic2CSV.py", line 22, in <module>
    dataFrameConsole = dataFrameRead.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\streaming.py", line 103, in awaitTermination
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 102, in deco
pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 39f99b61-806b-45da-b717-09a635405011, runId = 1022045a-8a75-4b25-ad41-328cbedcbfc4]
Current Committed Offsets: {KafkaV2[Subscribe[Jim_Topic]]: {"Jim_Topic":{"2":582,"1":264,"0":470}}}
Current Available Offsets: {KafkaV2[Subscribe[Jim_Topic]]: {"Jim_Topic":{"2":583,"1":264,"0":470}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]
   +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1b90a9ca, KafkaV2[Subscribe[Jim_Topic]]


Process finished with exit code 1


标签: apache-sparkpysparkspark-structured-streamingspark-kafka-integration

解决方案


You're missing the commons-pool2 jar. Linux version of the session builder config is:

sparkSesh = SparkSession.builder.config("spark.driver.extraClassPath", "/home/ubuntu/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar,/home/ubuntu/jars/commons-pool2-2.11.0.jar")\
etc.

You can find that here: https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.11.0


推荐阅读