首页 > 解决方案 > 使用 pyspark 重新分区失败并出现错误

问题描述

我在下面列的 s3 文件夹中有镶木地板。镶木地板的大小约为 40 mb。

org_id, device_id, channel_id, source, col1, col2

现在分区在 3 列org_id device_id channel_id

我想将分区更改为source, org_id, device_id, channel_id. 我正在使用 pyspark 从 s3 读取文件并写入 s3 存储桶。

sc = SparkContext(appName="parquet_ingestion1").getOrCreate()
spark = SparkSession(sc)
file_path = "s3://some-bucket/some_folder"
print("Reading parquet from s3:{}".format(file_path))
spark_df = spark.read.parquet(file_path)
print("Converting to parquet")
file_path_re = "s3://other_bucket/re-partition"
partition_columns = ["source", "org_id", "device_id", "channel_id "]

spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)

我收到错误并且未生成镶木地板文件。

spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 1:>                                                        (0 + 8) / 224]20/04/29 13:29:44 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-172-31-43-0.ap-south-1.compute.internal, executor 3): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

然后我尝试了

spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)


spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 3:>                                                        (0 + 8) / 224]20/04/29 13:32:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 23, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

[Stage 3:==>                                                      (8 + 8) / 224]20/04/29 13:32:22 WARN TaskSetManager: Lost task 0.2 in stage 3.0 (TID 40, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)

在第二种情况下,它失败了,但它也在创建镶木地板。现在我不确定它是否正确地将所有数据创建到新分区。让我知道重新划分镶木地板的正确方法是什么。

更新 1:

from pyspark.sql.types import StringType
for col1 in partition_columns:
    spark_df=spark_df.withColumn(col1, col(col1).cast(dataType=StringType()))             

尝试了 spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)

spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
我收到以下错误

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 20, ip-172-31-42-4.ap-south-1.compute.internal, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)

更新 2:

现在我发现其中一列存在架构不匹配,一列是字符串,另一列是浮点数。我已经描述了下面的场景。在这里您可以看到 col1 列是一行中的字符串,另一行是浮动的

org_id, device_id, channel_id, source,    col1, col2
"100"    "device1"  "channel"   "source1"  10    0.1
"100"    "device1"  "channel"   "source2"  "10"  0.1

我尝试将 col1 列转换为 float.it dodn;t 工作

任何建议。

标签: pysparkapache-spark-sqlparquet

解决方案


UPDATE2 中提到了问题的根本原因。在我的情况下,我们有 4 个应用程序(基于源的不同管道的一部分)写入镶木地板商店。2 应用程序 APP1 和 APP2 不使用 col1 和 APP 3 用于将其写为浮点数。最近 APP4 开始在他们的数据中获取 col1 并将其作为字符串存储在 parquet.parquet 中不要在写作时抱怨。在阅读这样的镶木地板时

  1. 我试过铸造它没有用
  2. 合并架构因数据类型不匹配而失败
  3. 我尝试根据源类型过滤数据。如果过滤掉 APP4 数据,它在一定意义上起作用。但如果过滤掉 APP3 数据,它就不起作用。

这可能不是一个好的解决方案,但我现在不得不满足于此。

解决方法:
1.过滤掉app4源数据,创建数据框并转换成parquet,然后只过滤数据框中的app4源parquet,去掉col1,转换成parquet。

  1. 或者从整个数据帧中删除 col 并写入 parquet。

df1 =df.select([c for c in df.columns if c!= 'col1'])


推荐阅读