首页 > 解决方案 > 如何使用 pyspark 从 CSV 中使用 Spark 在镶木地板中设置正确的数据类型

问题描述

我有一个 csv 文件,看起来像:

39813458,13451345,14513,SomeText,344564,Some other text,328984,"[{""field_int_one"":""16784832510"",""second_int_field"":""84017"",""third_int_field"":""245"",""some_timestamp_one"":""2018-04-17T23:54:34.000Z"",""some_other_timestamp"":""2018-03-03T15:34:04.000Z"",""one_more_int_field"":0,},{""field_int_one"":""18447548326"",""second_int_field"":""04965"",""third_int_field"":""679"",""some_timestamp_one"":""2018-02-06T03:39:12.000Z"",""some_other_timestamp"":""2018-03-01T09:19:12.000Z"",""one_more_int_field"":0}]"

我正在将其转换为镶木地板

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

if __name__ == "__main__":
    sqlContext = SQLContext(sc)

    schema = StructType([
              StructField("first_int", IntegerType(), True),
              StructField("second_int", IntegerType(), True),
              StructField("third_int", IntegerType(), True),
              StructField("first_string_field", StringType(), True),
              StructField("fourth_int", IntegerType(), True),
              StructField("second_string_field", StringType(), True),
              StructField("last_int_field", StringType(), True),
              StructField("json_field", StringType(), True)])

    rdd = spark.read.schema(schema).csv("source_file.csv")
    rdd.write.parquet('parquet_output')

它可以工作并对其进行转换,但是如果您.printSchema在查询它时执行一次,它显然会将其定义打印为字符串。如何正确地将最后一个字段声明为 Json?

标签: pythoncsvapache-sparkpysparkparquet

解决方案


我认为嵌套ArrayType适用于这种模式

schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
                StructType() \
                   .add("field_int_one", IntegerType()) \
                   .add("field_string_one", StringType()) \
                   .addMoreFieldsHere), 
          True)])

推荐阅读