首页 > 解决方案 > Pyspark Dataframe 读取将列内容移动不一致的数字

问题描述

代码版本:

您好,希望有人可以帮助我解决这个问题。我正在使用 PySpark 读取几个大文件(每个大约 80 GB,其中 6 个左右)。
使用一个领导节点,两个工作节点。

正在读取的文件有 1000 多个列,并且有数百万行。当我尝试使用 pyspark 读取函数处理此文件时,它会按不一致的列数推出列值。

我尝试过的事情:

  1. 查看每行的分隔符数量。这个数字似乎在各行中是一致的,但我只查看了一个文件。
  2. read.text(s3_path)通过对每列使用和使用 substr 来尝试为每个 col 固定宽度。这没有用。
  3. 试图从空字符串推断空值.option("emptyValue", '')
  4. 当我尝试查看 CSV 中的原始数据时,我看不到未转义的引号或\t其中。在 pandas 中读取有问题的行可以正确读取...
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import *


Class SparkCode:
    def __init__(self):
       self.session = SparkSession.builder.appName("MyApp").getOrCreate()
       self.schema = StructType().add

    def process_file(self, s3_bucket, s3_key):
        s3_path = f's3a://{s3_bucket}/{s3_key}'

        df_reader = self.session.read

        responses = df_reader.option("delimiter", "\t") \
            .csv(s3_path) \
            .select(
                    col('_c5').alias("state").cast(StringType()),
                    col('_c6').alias("zip_code").cast(StringType()),
                    col('_c7').alias("zip_plus_4").cast(StringType()),
                    col('_c8').alias("carrier_route").cast(StringType()),
                    col('_c9').alias("county_code").cast(StringType()),
                    col('_c10').alias("county_name").cast(StringType()),
                    col('_c11').alias("phone_number").cast(StringType()),
...
                    col('_c745').alias("another_important_col").cast(StringType()))
        responses.show()
        responses.select("all_the_above_cols").write.mode("append").parquet("s3a://other_s3_path" + ".parquet/")

    

问题:鉴于调查每个有问题的行需要很长时间,我该如何改进上述代码以正确解析这些数据?任何建议或提示表示赞赏。

标签: pythoncsvapache-sparkpyspark

解决方案


您可以尝试的东西很少。

  1. 如果您在 Excel 中打开 csv 文件并进行检查,但没有看到任何问题,请尝试在 notepad++ 中打开它,看看是否有任何差异。有时,如果 csv 文件中有一些奇怪的东西,如果我们在 excel 中打开,我们就不会知道它。
  2. 查看您的 csv 文件是否包含其值可以是多行的任何列。如果你找到任何东西,那么你可以添加一个option("multiline","true"),看看它是否有效。
  3. 如果您的数据中有任何引号或转义字符串可能导致问题,那么您可以添加另外两个选项以查看它是否有效。即.option("quote", "\"").option("escape", "\"")

我遇到了类似的问题,并通过向阅读器添加了这个额外的选项来解决问题,并且效果很好。


推荐阅读