python - Pyspark Dataframe 读取将列内容移动不一致的数字
问题描述
代码版本:
- 蟒蛇==3.7
- 星火版本==2.4.7
- Pyspark==2.4.5
- 蜂巢==2.3.7
您好,希望有人可以帮助我解决这个问题。我正在使用 PySpark 读取几个大文件(每个大约 80 GB,其中 6 个左右)。
使用一个领导节点,两个工作节点。
正在读取的文件有 1000 多个列,并且有数百万行。当我尝试使用 pyspark 读取函数处理此文件时,它会按不一致的列数推出列值。
我尝试过的事情:
- 查看每行的分隔符数量。这个数字似乎在各行中是一致的,但我只查看了一个文件。
read.text(s3_path)
通过对每列使用和使用 substr 来尝试为每个 col 固定宽度。这没有用。- 试图从空字符串推断空值
.option("emptyValue", '')
- 当我尝试查看 CSV 中的原始数据时,我看不到未转义的引号或
\t
其中。在 pandas 中读取有问题的行可以正确读取...
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import *
Class SparkCode:
def __init__(self):
self.session = SparkSession.builder.appName("MyApp").getOrCreate()
self.schema = StructType().add
def process_file(self, s3_bucket, s3_key):
s3_path = f's3a://{s3_bucket}/{s3_key}'
df_reader = self.session.read
responses = df_reader.option("delimiter", "\t") \
.csv(s3_path) \
.select(
col('_c5').alias("state").cast(StringType()),
col('_c6').alias("zip_code").cast(StringType()),
col('_c7').alias("zip_plus_4").cast(StringType()),
col('_c8').alias("carrier_route").cast(StringType()),
col('_c9').alias("county_code").cast(StringType()),
col('_c10').alias("county_name").cast(StringType()),
col('_c11').alias("phone_number").cast(StringType()),
...
col('_c745').alias("another_important_col").cast(StringType()))
responses.show()
responses.select("all_the_above_cols").write.mode("append").parquet("s3a://other_s3_path" + ".parquet/")
问题:鉴于调查每个有问题的行需要很长时间,我该如何改进上述代码以正确解析这些数据?任何建议或提示表示赞赏。
解决方案
您可以尝试的东西很少。
- 如果您在 Excel 中打开 csv 文件并进行检查,但没有看到任何问题,请尝试在 notepad++ 中打开它,看看是否有任何差异。有时,如果 csv 文件中有一些奇怪的东西,如果我们在 excel 中打开,我们就不会知道它。
- 查看您的 csv 文件是否包含其值可以是多行的任何列。如果你找到任何东西,那么你可以添加一个
option("multiline","true")
,看看它是否有效。 - 如果您的数据中有任何引号或转义字符串可能导致问题,那么您可以添加另外两个选项以查看它是否有效。即
.option("quote", "\"")
和.option("escape", "\"")
我遇到了类似的问题,并通过向阅读器添加了这个额外的选项来解决问题,并且效果很好。
推荐阅读
- android - Android中服务的回调方法显示
- java - 即使设置了位置,如何修复 Java FXML Loader 崩溃?
- ios - 如何防止变量被初始化两次?
- python - 数据字段中的混合数据类型
- bootstrap-4 - 如何使 ng-bootstrap typeahead 显示带有链接的下拉结果
- file - ionic 2 - 文件存储路径
- python - 检查列表/集合中是否已经存在不可散列的项目
- tensorflow - TensorFlow Lite:初始化节点不存在
- hl7 - 无法启动 Mirth 的管理员工具
- git - 如何使 VSCode 强制重新加载更改的文件?