exception - 尝试分析 Twitter 数据集时出现 PySpark 错误
问题描述
我正在尝试分析我的教授在 2 年内收集的大量 Twitter 转储到多个镶木地板文件中。我已经用一个文件运行了所有代码,并且所有代码都运行顺利,但是当我尝试读取和分析所有 parquet 文件时,我一直面临问题。这是我面临的错误的一个例子。我正在尝试创建一个临时表视图,以便我可以运行 SQL 查询来查询数据。请找到我的 Spark 配置:
[('spark.app.name', 'Colab'),
('spark.driver.memory', '50g'),
('spark.app.id', 'local-1636990273526'),
('spark.memory.offHeap.size', '20g'),
('spark.app.startTime', '1636990272425'),
('spark.driver.port', '40705'),
('spark.executor.id', 'driver'),
('spark.driver.host', '3c3ebb00872b'),
('spark.sql.warehouse.dir', 'file:/content/spark-warehouse'),
('spark.memory.offHeap.enabled', 'true'),
('spark.rdd.compress', 'True'),
('spark.serializer.objectStreamReset', '100'),
('spark.master', 'local[*]'),
('spark.submit.pyFiles', ''),
('spark.submit.deployMode', 'client'),
('spark.ui.showConsoleProgress', 'true'),
('spark.executor.memory', '50g')]
spark 数据框有 3.93 亿行和 383 列。
每当我尝试运行以下代码时:
df.createOrReplaceTempView("twitlogs")
tbl = spark.sql("""
SELECT * FROM twitlogs LIMIT 20
""")
tbl.show()
我收到以下错误:
Py4JJavaError: An error occurred while calling o114.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 34 in stage 7.0 failed 1 times, most recent failure: Lost task 34.0 in stage 7.0 (TID 1219) (3c3ebb00872b executor driver): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///content/drive/MyDrive/ProjAbe/tweets-2020-08-25T09___30___02Z.parq. Column: [geo], Expected: double, Found: BINARY
at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:172)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1077)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:172)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:154)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:184)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
... 16 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///content/drive/MyDrive/ProjAbe/tweets-2020-08-25T09___30___02Z.parq. Column: [geo], Expected: double, Found: BINARY
at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:172)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1077)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:172)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:154)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:184)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
... 16 more
我只用 1 个 parquet 文件运行了相同的代码,该文件有大约 183000 行,并且运行平稳。当我运行此代码试图查找缺失值时,我也会遇到同样的错误:
def get_dtype(df,colname):
return [dtype for name, dtype in df.dtypes if name == colname][0]
df_mis = df.select([count(when(col(c).isNull() if get_dtype(df, c) not in ['double', 'float'] else isnan(col(c)), c)).alias(c) for c in df.columns])
df_mis.show()
我认为这是一个内存问题,但不知道如何处理它。我试图花一整天时间对整个问题进行广泛的研究,但一无所获。请帮忙。
21 年 11 月 16 日更新:在被要求包含架构后,我只能包含其中的一部分,因为 stackoverflow 中有字数限制。请注意,数据集有 383 列。
root
|-- created_at: string (nullable = true)
|-- id: long (nullable = true)
|-- id_str: string (nullable = true)
|-- text: string (nullable = true)
|-- display_text_range: string (nullable = true)
|-- source: string (nullable = true)
|-- truncated: boolean (nullable = true)
|-- in_reply_to_status_id: double (nullable = true)
|-- in_reply_to_status_id_str: string (nullable = true)
|-- in_reply_to_user_id: double (nullable = true)
|-- in_reply_to_user_id_str: string (nullable = true)
|-- in_reply_to_screen_name: string (nullable = true)
|-- geo: double (nullable = true)
|-- coordinates: double (nullable = true)
|-- place: double (nullable = true)
|-- contributors: string (nullable = true)
|-- is_quote_status: boolean (nullable = true)
|-- quote_count: long (nullable = true)
|-- reply_count: long (nullable = true)
|-- retweet_count: long (nullable = true)
|-- favorite_count: long (nullable = true)
|-- favorited: boolean (nullable = true)
|-- retweeted: boolean (nullable = true)
|-- filter_level: string (nullable = true)
|-- lang: string (nullable = true)
|-- timestamp_ms: string (nullable = true)
|-- user_id: long (nullable = true)
|-- user_id_str: string (nullable = true)
|-- user_name: string (nullable = true)
|-- user_screen_name: string (nullable = true)
|-- user_location: string (nullable = true)
|-- user_url: string (nullable = true)
|-- user_description: string (nullable = true)
|-- user_translator_type: string (nullable = true)
|-- user_protected: boolean (nullable = true)
|-- user_verified: boolean (nullable = true)
|-- user_followers_count: long (nullable = true)
|-- user_friends_count: long (nullable = true)
|-- user_listed_count: long (nullable = true)
|-- user_favourites_count: long (nullable = true)
|-- user_statuses_count: long (nullable = true)
|-- user_created_at: string (nullable = true)
|-- user_utc_offset: string (nullable = true)
|-- user_time_zone: string (nullable = true)
|-- user_geo_enabled: boolean (nullable = true)
|-- user_lang: string (nullable = true)
|-- user_contributors_enabled: boolean (nullable = true)
|-- user_is_translator: boolean (nullable = true)
|-- user_profile_background_color: string (nullable = true)
|-- user_profile_background_image_url: string (nullable = true)
|-- user_profile_background_image_url_https: string (nullable = true)
|-- user_profile_background_tile: boolean (nullable = true)
|-- user_profile_link_color: string (nullable = true)
|-- user_profile_sidebar_border_color: string (nullable = true)
|-- user_profile_sidebar_fill_color: string (nullable = true)
|-- user_profile_text_color: string (nullable = true)
|-- user_profile_use_background_image: boolean (nullable = true)
|-- user_profile_image_url: string (nullable = true)
|-- user_profile_image_url_https: string (nullable = true)
|-- user_profile_banner_url: string (nullable = true)
|-- user_default_profile: boolean (nullable = true)
|-- user_default_profile_image: boolean (nullable = true)
|-- user_following: string (nullable = true)
|-- user_follow_request_sent: string (nullable = true)
|-- user_notifications: string (nullable = true)
|-- entities_hashtags: string (nullable = true)
|-- entities_urls: string (nullable = true)
|-- entities_user_mentions: string (nullable = true)
|-- entities_symbols: string (nullable = true)
|-- retweeted_status_created_at: string (nullable = true)
|-- retweeted_status_id: double (nullable = true)
|-- retweeted_status_id_str: string (nullable = true)
|-- retweeted_status_text: string (nullable = true)
|-- retweeted_status_display_text_range: string (nullable = true)
|-- retweeted_status_source: string (nullable = true)
|-- retweeted_status_truncated: boolean (nullable = true)
|-- retweeted_status_in_reply_to_status_id: double (nullable = true)
|-- retweeted_status_in_reply_to_status_id_str: string (nullable = true)
|-- retweeted_status_in_reply_to_user_id: double (nullable = true)
|-- retweeted_status_in_reply_to_user_id_str: string (nullable = true)
|-- retweeted_status_in_reply_to_screen_name: string (nullable = true)
|-- retweeted_status_user_id: double (nullable = true)
|-- retweeted_status_user_id_str: string (nullable = true)
|-- retweeted_status_user_name: string (nullable = true)
|-- retweeted_status_user_screen_name: string (nullable = true)
|-- retweeted_status_user_location: string (nullable = true)
|-- retweeted_status_user_url: string (nullable = true)
|-- retweeted_status_user_description: string (nullable = true)
|-- retweeted_status_user_translator_type: string (nullable = true)
|-- retweeted_status_user_protected: boolean (nullable = true)
|-- retweeted_status_user_verified: boolean (nullable = true)
|-- retweeted_status_user_followers_count: double (nullable = true)
|-- retweeted_status_user_friends_count: double (nullable = true)
|-- retweeted_status_user_listed_count: double (nullable = true)
|-- retweeted_status_user_favourites_count: double (nullable = true)
|-- retweeted_status_user_statuses_count: double (nullable = true)
|-- retweeted_status_user_created_at: string (nullable = true)
|-- retweeted_status_user_utc_offset: double (nullable = true)
|-- retweeted_status_user_time_zone: double (nullable = true)
|-- retweeted_status_user_geo_enabled: boolean (nullable = true)
|-- retweeted_status_user_lang: double (nullable = true)
|-- retweeted_status_user_contributors_enabled: boolean (nullable = true)
|-- retweeted_status_user_is_translator: boolean (nullable = true)
|-- retweeted_status_user_profile_background_color: string (nullable = true)
|-- retweeted_status_user_profile_background_image_url: string (nullable = true)
|-- retweeted_status_user_profile_background_image_url_https: string (nullable = true)
|-- retweeted_status_user_profile_background_tile: boolean (nullable = true)
|-- retweeted_status_user_profile_link_color: string (nullable = true)
|-- retweeted_status_user_profile_sidebar_border_color: string (nullable = true)
|-- retweeted_status_user_profile_sidebar_fill_color: string (nullable = true)
|-- retweeted_status_user_profile_text_color: string (nullable = true)
|-- retweeted_status_user_profile_use_background_image: boolean (nullable = true)
|-- retweeted_status_user_profile_image_url: string (nullable = true)
|-- retweeted_status_user_profile_image_url_https: string (nullable = true)
|-- retweeted_status_user_profile_banner_url: string (nullable = true)
|-- retweeted_status_user_default_profile: boolean (nullable = true)
|-- retweeted_status_user_default_profile_image: boolean (nullable = true)
|-- retweeted_status_user_following: double (nullable = true)
|-- retweeted_status_user_follow_request_sent: double (nullable = true)
|-- retweeted_status_user_notifications: double (nullable = true)
|-- retweeted_status_geo: double (nullable = true)
|-- retweeted_status_coordinates: double (nullable = true)
|-- retweeted_status_place: double (nullable = true)
|-- retweeted_status_contributors: double (nullable = true)
|-- retweeted_status_is_quote_status: boolean (nullable = true)
|-- retweeted_status_extended_tweet_full_text: string (nullable = true)
2021 年 11 月 18 日更新:我进一步发现了我的错误。似乎某些镶木地板文件将“geo”列的模式作为字符串而不是双精度。尝试 mergeSchema 后,它给出了类似的错误。“无法合并字段 'geo' 和 'geo'。无法合并不兼容的数据类型 double 和 string”。
解决方案
推荐阅读
- python - 为什么我的布尔条件会生成“ValueError:只能比较标记相同的系列对象”?
- php - 数据库列表延迟
- go - 有没有一种简单的方法可以在 Go 程序中编译 Go 代码而无需读取/写入磁盘?
- python - 已达到 Django Celery Max DB 连接数
- python - 使用 Python 将多种文件类型(*.jpg、*.pdf 和 *.tiff)移动到另一个目录
- java - 将字符串解析为公钥时出现问题;无效的密钥格式
- c++ - 如何为函数参数指定任意模板参数?
- javascript - 卡组件无法正常工作
- javascript - XMLHttpRequest 脚本冲突
- html - 点击发生在错误的元素上