首页 > 解决方案 > 尝试分析 Twitter 数据集时出现 PySpark 错误

问题描述

我正在尝试分析我的教授在 2 年内收集的大量 Twitter 转储到多个镶木地板文件中。我已经用一个文件运行了所有代码,并且所有代码都运行顺利,但是当我尝试读取和分析所有 parquet 文件时,我一直面临问题。这是我面临的错误的一个例子。我正在尝试创建一个临时表视图,以便我可以运行 SQL 查询来查询数据。请找到我的 Spark 配置:

[('spark.app.name', 'Colab'),
 ('spark.driver.memory', '50g'),
 ('spark.app.id', 'local-1636990273526'),
 ('spark.memory.offHeap.size', '20g'),
 ('spark.app.startTime', '1636990272425'),
 ('spark.driver.port', '40705'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '3c3ebb00872b'),
 ('spark.sql.warehouse.dir', 'file:/content/spark-warehouse'),
 ('spark.memory.offHeap.enabled', 'true'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.executor.memory', '50g')]

spark 数据框有 3.93 亿行和 383 列。

每当我尝试运行以下代码时:

df.createOrReplaceTempView("twitlogs")

tbl = spark.sql("""
  SELECT * FROM twitlogs LIMIT 20
  """)

tbl.show()

我收到以下错误:

Py4JJavaError: An error occurred while calling o114.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 34 in stage 7.0 failed 1 times, most recent failure: Lost task 34.0 in stage 7.0 (TID 1219) (3c3ebb00872b executor driver): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///content/drive/MyDrive/ProjAbe/tweets-2020-08-25T09___30___02Z.parq. Column: [geo], Expected: double, Found: BINARY
    at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:172)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1077)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:172)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:154)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:184)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
    ... 16 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///content/drive/MyDrive/ProjAbe/tweets-2020-08-25T09___30___02Z.parq. Column: [geo], Expected: double, Found: BINARY
    at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:172)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1077)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:172)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:154)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:184)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
    ... 16 more

我只用 1 个 parquet 文件运行了相同的代码,该文件有大约 183000 行,并且运行平稳。当我运行此代码试图查找缺失值时,我也会遇到同样的错误:

def get_dtype(df,colname):
    return [dtype for name, dtype in df.dtypes if name == colname][0]

df_mis = df.select([count(when(col(c).isNull() if get_dtype(df, c) not in ['double', 'float'] else isnan(col(c)), c)).alias(c) for c in df.columns])

df_mis.show()

我认为这是一个内存问题,但不知道如何处理它。我试图花一整天时间对整个问题进行广泛的研究,但一无所获。请帮忙。

21 年 11 月 16 日更新:在被要求包含架构后,我只能包含其中的一部分,因为 stackoverflow 中有字数限制。请注意,数据集有 383 列。

root
 |-- created_at: string (nullable = true)
 |-- id: long (nullable = true)
 |-- id_str: string (nullable = true)
 |-- text: string (nullable = true)
 |-- display_text_range: string (nullable = true)
 |-- source: string (nullable = true)
 |-- truncated: boolean (nullable = true)
 |-- in_reply_to_status_id: double (nullable = true)
 |-- in_reply_to_status_id_str: string (nullable = true)
 |-- in_reply_to_user_id: double (nullable = true)
 |-- in_reply_to_user_id_str: string (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- geo: double (nullable = true)
 |-- coordinates: double (nullable = true)
 |-- place: double (nullable = true)
 |-- contributors: string (nullable = true)
 |-- is_quote_status: boolean (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- favorited: boolean (nullable = true)
 |-- retweeted: boolean (nullable = true)
 |-- filter_level: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- timestamp_ms: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_id_str: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_url: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_translator_type: string (nullable = true)
 |-- user_protected: boolean (nullable = true)
 |-- user_verified: boolean (nullable = true)
 |-- user_followers_count: long (nullable = true)
 |-- user_friends_count: long (nullable = true)
 |-- user_listed_count: long (nullable = true)
 |-- user_favourites_count: long (nullable = true)
 |-- user_statuses_count: long (nullable = true)
 |-- user_created_at: string (nullable = true)
 |-- user_utc_offset: string (nullable = true)
 |-- user_time_zone: string (nullable = true)
 |-- user_geo_enabled: boolean (nullable = true)
 |-- user_lang: string (nullable = true)
 |-- user_contributors_enabled: boolean (nullable = true)
 |-- user_is_translator: boolean (nullable = true)
 |-- user_profile_background_color: string (nullable = true)
 |-- user_profile_background_image_url: string (nullable = true)
 |-- user_profile_background_image_url_https: string (nullable = true)
 |-- user_profile_background_tile: boolean (nullable = true)
 |-- user_profile_link_color: string (nullable = true)
 |-- user_profile_sidebar_border_color: string (nullable = true)
 |-- user_profile_sidebar_fill_color: string (nullable = true)
 |-- user_profile_text_color: string (nullable = true)
 |-- user_profile_use_background_image: boolean (nullable = true)
 |-- user_profile_image_url: string (nullable = true)
 |-- user_profile_image_url_https: string (nullable = true)
 |-- user_profile_banner_url: string (nullable = true)
 |-- user_default_profile: boolean (nullable = true)
 |-- user_default_profile_image: boolean (nullable = true)
 |-- user_following: string (nullable = true)
 |-- user_follow_request_sent: string (nullable = true)
 |-- user_notifications: string (nullable = true)
 |-- entities_hashtags: string (nullable = true)
 |-- entities_urls: string (nullable = true)
 |-- entities_user_mentions: string (nullable = true)
 |-- entities_symbols: string (nullable = true)
 |-- retweeted_status_created_at: string (nullable = true)
 |-- retweeted_status_id: double (nullable = true)
 |-- retweeted_status_id_str: string (nullable = true)
 |-- retweeted_status_text: string (nullable = true)
 |-- retweeted_status_display_text_range: string (nullable = true)
 |-- retweeted_status_source: string (nullable = true)
 |-- retweeted_status_truncated: boolean (nullable = true)
 |-- retweeted_status_in_reply_to_status_id: double (nullable = true)
 |-- retweeted_status_in_reply_to_status_id_str: string (nullable = true)
 |-- retweeted_status_in_reply_to_user_id: double (nullable = true)
 |-- retweeted_status_in_reply_to_user_id_str: string (nullable = true)
 |-- retweeted_status_in_reply_to_screen_name: string (nullable = true)
 |-- retweeted_status_user_id: double (nullable = true)
 |-- retweeted_status_user_id_str: string (nullable = true)
 |-- retweeted_status_user_name: string (nullable = true)
 |-- retweeted_status_user_screen_name: string (nullable = true)
 |-- retweeted_status_user_location: string (nullable = true)
 |-- retweeted_status_user_url: string (nullable = true)
 |-- retweeted_status_user_description: string (nullable = true)
 |-- retweeted_status_user_translator_type: string (nullable = true)
 |-- retweeted_status_user_protected: boolean (nullable = true)
 |-- retweeted_status_user_verified: boolean (nullable = true)
 |-- retweeted_status_user_followers_count: double (nullable = true)
 |-- retweeted_status_user_friends_count: double (nullable = true)
 |-- retweeted_status_user_listed_count: double (nullable = true)
 |-- retweeted_status_user_favourites_count: double (nullable = true)
 |-- retweeted_status_user_statuses_count: double (nullable = true)
 |-- retweeted_status_user_created_at: string (nullable = true)
 |-- retweeted_status_user_utc_offset: double (nullable = true)
 |-- retweeted_status_user_time_zone: double (nullable = true)
 |-- retweeted_status_user_geo_enabled: boolean (nullable = true)
 |-- retweeted_status_user_lang: double (nullable = true)
 |-- retweeted_status_user_contributors_enabled: boolean (nullable = true)
 |-- retweeted_status_user_is_translator: boolean (nullable = true)
 |-- retweeted_status_user_profile_background_color: string (nullable = true)
 |-- retweeted_status_user_profile_background_image_url: string (nullable = true)
 |-- retweeted_status_user_profile_background_image_url_https: string (nullable = true)
 |-- retweeted_status_user_profile_background_tile: boolean (nullable = true)
 |-- retweeted_status_user_profile_link_color: string (nullable = true)
 |-- retweeted_status_user_profile_sidebar_border_color: string (nullable = true)
 |-- retweeted_status_user_profile_sidebar_fill_color: string (nullable = true)
 |-- retweeted_status_user_profile_text_color: string (nullable = true)
 |-- retweeted_status_user_profile_use_background_image: boolean (nullable = true)
 |-- retweeted_status_user_profile_image_url: string (nullable = true)
 |-- retweeted_status_user_profile_image_url_https: string (nullable = true)
 |-- retweeted_status_user_profile_banner_url: string (nullable = true)
 |-- retweeted_status_user_default_profile: boolean (nullable = true)
 |-- retweeted_status_user_default_profile_image: boolean (nullable = true)
 |-- retweeted_status_user_following: double (nullable = true)
 |-- retweeted_status_user_follow_request_sent: double (nullable = true)
 |-- retweeted_status_user_notifications: double (nullable = true)
 |-- retweeted_status_geo: double (nullable = true)
 |-- retweeted_status_coordinates: double (nullable = true)
 |-- retweeted_status_place: double (nullable = true)
 |-- retweeted_status_contributors: double (nullable = true)
 |-- retweeted_status_is_quote_status: boolean (nullable = true)
 |-- retweeted_status_extended_tweet_full_text: string (nullable = true)
 

2021 年 11 月 18 日更新:我进一步发现了我的错误。似乎某些镶木地板文件将“geo”列的模式作为字符串而不是双精度。尝试 mergeSchema 后,它给出了类似的错误。“无法合并字段 'geo' 和 'geo'。无法合并不兼容的数据类型 double 和 string”。

标签: exceptionpysparktypesapache-spark-sql

解决方案


推荐阅读