pyspark - 使用 pyspark 重新分区失败并出现错误
问题描述
我在下面列的 s3 文件夹中有镶木地板。镶木地板的大小约为 40 mb。
org_id, device_id, channel_id, source, col1, col2
现在分区在 3 列org_id device_id channel_id
我想将分区更改为source, org_id, device_id, channel_id.
我正在使用 pyspark 从 s3 读取文件并写入 s3 存储桶。
sc = SparkContext(appName="parquet_ingestion1").getOrCreate()
spark = SparkSession(sc)
file_path = "s3://some-bucket/some_folder"
print("Reading parquet from s3:{}".format(file_path))
spark_df = spark.read.parquet(file_path)
print("Converting to parquet")
file_path_re = "s3://other_bucket/re-partition"
partition_columns = ["source", "org_id", "device_id", "channel_id "]
spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
我收到错误并且未生成镶木地板文件。
spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 1:> (0 + 8) / 224]20/04/29 13:29:44 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-172-31-43-0.ap-south-1.compute.internal, executor 3): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
然后我尝试了
spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 3:> (0 + 8) / 224]20/04/29 13:32:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 23, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 3:==> (8 + 8) / 224]20/04/29 13:32:22 WARN TaskSetManager: Lost task 0.2 in stage 3.0 (TID 40, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
在第二种情况下,它失败了,但它也在创建镶木地板。现在我不确定它是否正确地将所有数据创建到新分区。让我知道重新划分镶木地板的正确方法是什么。
更新 1:
from pyspark.sql.types import StringType
for col1 in partition_columns:
spark_df=spark_df.withColumn(col1, col(col1).cast(dataType=StringType()))
尝试了 spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
我收到以下错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 20, ip-172-31-42-4.ap-south-1.compute.internal, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
更新 2:
现在我发现其中一列存在架构不匹配,一列是字符串,另一列是浮点数。我已经描述了下面的场景。在这里您可以看到 col1 列是一行中的字符串,另一行是浮动的
org_id, device_id, channel_id, source, col1, col2
"100" "device1" "channel" "source1" 10 0.1
"100" "device1" "channel" "source2" "10" 0.1
我尝试将 col1 列转换为 float.it dodn;t 工作
任何建议。
解决方案
UPDATE2 中提到了问题的根本原因。在我的情况下,我们有 4 个应用程序(基于源的不同管道的一部分)写入镶木地板商店。2 应用程序 APP1 和 APP2 不使用 col1 和 APP 3 用于将其写为浮点数。最近 APP4 开始在他们的数据中获取 col1 并将其作为字符串存储在 parquet.parquet 中不要在写作时抱怨。在阅读这样的镶木地板时
- 我试过铸造它没有用
- 合并架构因数据类型不匹配而失败
- 我尝试根据源类型过滤数据。如果过滤掉 APP4 数据,它在一定意义上起作用。但如果过滤掉 APP3 数据,它就不起作用。
这可能不是一个好的解决方案,但我现在不得不满足于此。
解决方法:
1.过滤掉app4源数据,创建数据框并转换成parquet,然后只过滤数据框中的app4源parquet,去掉col1,转换成parquet。
- 或者从整个数据帧中删除 col 并写入 parquet。
df1 =df.select([c for c in df.columns if c!= 'col1'])
推荐阅读
- javascript - 如何在每个动态宽度中使 bootstrap 4 下拉菜单正好位于父组件下方?
- c - 为什么 (5+10)/2 是 7.0 而不是 7.5?C 编程
- reactjs - 智能/哑组件模式的正确用法是什么?
- arm - 用电位器控制伺服电机
- bar-chart - 如何在条形多条形图顶部添加工具提示以查找echarts中条形之间的百分比差异?
- python - 'varname' 不是从 API 调用响应中定义的
- python-3.x - 如何从 pygame 游戏中获取帧
- android - 我打电话给某人时需要得到事件,对我来说事件,最重要的是回铃事件
- math - 真的千里马解决不了吗?
- machine-learning - python中的相关矩阵在哪个基础上具有特征?