python - python 上的 kafka 和 Spark Streaming 的一个坏问题
问题描述
注意这与我在本网站的第一篇文章中遇到的问题不同,但它是同一个项目。
我正在使用火花流将一些文件从 kafka 摄取到 PostgreSQL 中。这些是我的项目步骤:
1-为kafka生产者创建一个脚本(完成,它工作正常)
2-创建一个从kafka生产者读取文件的python脚本
3- 发送文件到 PostgreSQL
对于 python 和 postgreSQL 之间的连接,我使用 psycopg2。我也在使用 python 3 和 java jdk1.8.0_261,并且 kafka 和 spark 流之间的集成工作正常。我有 kafka 2.12-2.6.0 和 spark 3.0.1,我在我的 Spark jars 目录中添加了这些 jars:
- postgresql-42.2.18-spark-streaming-kafka-0-10-assembly_2.12-3.0.1
- spark-token-provider-kafka-0.10_2.12-3.0.1
- kafka-clients-2.6.0
- spark-sql-kafka-0-10-assembly_2.12-3.0.1
我还必须下载 VC++ 以解决另一个与我的项目相关的问题。
这是我的一段python代码,它从kafka生产者那里获取文件并将它们发送到我在postgreSQL中创建的postgreSQL表中,我遇到了问题:
query = satelliteTable.writeStream.outputMode("append").foreachBatch(process_row) \
.option("checkpointLocation", "C:\\Users\\Vito\\Documents\\popo").start()
print("Starting")
print(query)
query.awaitTermination()
query.stop()
SatelliteTable 是我使用来自 kafka 生产者的文件创建的 spark 数据框。process_row 是将流数据帧的每一行插入到 postgre 表中的函数。这里是:
def process_row(df, epoch_id):
for row in df.rdd.collect():
cursor1.execute(
'INSERT INTO satellite(filename,satellite_prn_number, date, time,crs,delta_n, m0,
cuc,e_eccentricity,cus,'
'sqrt_a, toe_time_of_ephemeris, cic, omega_maiusc, cis, i0, crc, omega, omega_dot, idot)
VALUES (%s,%s,%s,'
'%s,%s,%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', row)
connection.commit()
pass
运行代码时遇到的问题发生在query = satelliteTable.writeStream.outputMode("append").foreachBatch(process_row) \ .option("checkpointLocation", "C:\\Users\\Vito\\Documents\\popo").start()
,简而言之,如下所示:
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1
times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, DESKTOP-
D600TY.homenet.telecomitalia.it, executor driver): java.lang.NoClassDefFoundError:
org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
=== Streaming Query ===
Identifier: [id = 599f75a7-5db6-426e-9082-7fbbf5196db9, runId = 67693586-27b1-4ca7-9a44-0f69ad90eafe]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[bogi2890.20n]]: {"bogi2890.20n":{"0":68}}}
Current State: ACTIVE
Thread State: RUNNABLE
有趣的事实是,相同的代码在我朋友的笔记本电脑上运行良好,使用 spark 3.0.0。所以,我认为我缺少一些罐子或其他东西,因为代码是正确的。
任何想法?谢谢。
解决方案
你错过了这个 jarhttps://mvnrepository.com/artifact/org.apache.commons/commons-pool2
试试那个特定的版本https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.6.2
推荐阅读
- java - java.lang.IllegalStateException:无法执行 android 的方法:onClick 访问变量 Android
- asp.net - 将架构添加到 SiteMapPath 控件导致 404 错误
- jquery - Jquery页面加载即使在html网站上具有css效果
- jenkins - 替换 pom.xml 文件 jenkins 管道中的 SNAPSHOT
- javascript - 如果购物车中没有商品,则隐藏 AJAX 购物车商品编号
- apache-spark-sql - 如何在 SparkSQL 中注册临时表
- python - 如何使用 python 与打印框交互
- recursion - Ocaml 中的列表递归
- ios - Xcode 9.4 Build(Debug, Release) OK,但存档失败,“找不到选项的目录..”
- python - np.shuffle 比 np.random.choice 慢得多