首页 > 解决方案 > CSV 数据源不支持 pyspark 中的空数据类型

问题描述

我正在尝试使用 pyspark 方法将数据帧保存到文件中,但我遇到了上述错误。我在 python 2.7 中尝试的相同代码,它可以工作,但 python 3.6 它不工作。谁能帮我做错了什么?我正在添加表的架构详细信息。

我正在尝试以下代码:

df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').save(output_path,quote='',escape='\"', sep='|',header='True',nullValue=None)

打印架构():

|-- fact_id: integer (nullable = true)
 |-- Active_Flag: null (nullable = true)
 |-- Project_End_Date: string (nullable = true)
 |-- Project_Effective_Date: string (nullable = true)
 |-- Notification to Implement Issued Clarification: string (nullable = true)
 |-- Notification to Implement Issued: string (nullable = true)
 |-- pERC Reconsideration Meeting Clarification: string (nullable = true)
 |-- pERC Reconsideration Meeting: string (nullable = true)
 |-- Feedback Deadline Clarification: string (nullable = true)
 |-- Feedback Deadline: string (nullable = true)
 |-- pERC Meeting Clarification: string (nullable = true)
 |-- pERC Meeting: string (nullable = true)
 |-- Check-point meeting Clarification: string (nullable = true)
 |-- Check-point meeting: string (nullable = true)
 |-- Patient Advocacy Group Input Deadline Clarification: string (nullable = true)
 |-- Patient Advocacy Group Input Deadline: string (nullable = true)
 |-- NOC Date: string (nullable = true)
 |-- Pre NOC Submission: string (nullable = true)
 |-- Status Clarification: string (nullable = true)
 |-- Review Status: string (nullable = true)
 |-- Prioritization: string (nullable = true)
 |-- Tumour Type: string (nullable = true)
 |-- Submitter: string (nullable = true)
 |-- Sponsor: string (nullable = true)
 |-- Funding Request: string (nullable = true)
 |-- CADTH review report(s) posted: string (nullable = true)
 |-- Deadline for sponsor to submit redaction requests on draft CADTH review report(s): string (nullable = true)
 |-- Reconsideration Requested Clarification: string (nullable = true)
 |-- Reconsideration Requested: string (nullable = true)
 |-- Final Recommendation Issued Clarification: string (nullable = true)
 |-- Final recommendation posted: string (nullable = true)
 |-- Final Recommendation Sent to Drug Plans and Manufacturer Date: string (nullable = true)
 |-- Reconsideration Meeting Date Clarification: string (nullable = true)
 |-- Reconsideration Meeting Date: string (nullable = true)
 |-- Embargo period ended clarification: null (nullable = true)
 |-- Embargo period ended: null (nullable = true)
 |-- Embargo Period Date Clarification: string (nullable = true)
 |-- Embargo Period Date: string (nullable = true)
 |-- Final recommendation issued to sponsor and drug plans Clarification: string (nullable = true)
 |-- Final recommendation issued to sponsor and drug plans: string (nullable = true)
 |-- CDEC Meeting Date Clarification: string (nullable = true)
 |-- CDEC Meeting Date: string (nullable = true)
 |-- Redaction Response from Manufacturer on Report Received by CADTH Date Clarification: string (nullable = true)
 |-- Redaction Response from Manufacturer on Report Received by CADTH Date: string (nullable = true)
 |-- Comments from Manufacturers on Reviewer's Report Date Clarification: null (nullable = true)
 |-- Comments from Manufacturers on Reviewer's Report Date: null (nullable = true)
 |-- Deadline for sponsors comments Clarification: string (nullable = true)
 |-- Deadline for sponsors comments: string (nullable = true)
 |-- CADTH Reviewers Report sent to Manufacturer Date Clarification: string (nullable = true)
 |-- CADTH Reviewers Report sent to Manufacturer Date: string (nullable = true)
 |-- Patient Group Input Summary Comment Received Date Clarification: string (nullable = true)
 |-- Patient Group Input Summary Comment Received Date: string (nullable = true)
 |-- Patient input summary sent for review to patient input groups clarification: string (nullable = true)
 |-- Patient input summary sent for review to patient input groups: string (nullable = true)
 |-- Patient Group Input Submission Received Date Clarification: string (nullable = true)
 |-- Patient Group Input Submission Received Date: string (nullable = true)
 |-- Call for Patient Input Date Clarification: string (nullable = true)
 |-- Call for Patient Input Date: string (nullable = true)
 |-- Submission Deemed Complete Actual Date Clarification: string (nullable = true)
 |-- Submission Deemed Complete Actual Date: string (nullable = true)
 |-- Submission Deemed Complete Target Date Clarification: string (nullable = true)
 |-- Submission Deemed Complete Target Date: string (nullable = true)
 |-- Patient group input closed clarification: string (nullable = true)
 |-- Patient group input closed: string (nullable = true)
 |-- Fee Schedule: string (nullable = true)
 |-- Recommendation Type: string (nullable = true)
 |-- Initial Recommendation Issued Clarification: string (nullable = true)
 |-- Recommendation Date: string (nullable = true)
 |-- Companion Diagnostics: string (nullable = true)
 |-- Submission Type: string (nullable = true)
 |-- Review initiated: string (nullable = true)
 |-- Submission Date Clarification: string (nullable = true)
 |-- Submission accepted: string (nullable = true)
 |-- Submission received: string (nullable = true)
 |-- Manufacture_id: string (nullable = true)
 |-- Biosimilar: string (nullable = true)
 |-- Manufacturer Requested Reimbursement Criteria: string (nullable = true)
 |-- Thrc_area_id: string (nullable = true)
 |-- Generic_id: string (nullable = true)
 |-- BRAND_ID: string (nullable = true)
 |-- indication_id: string (nullable = true)
 |-- Strength: string (nullable = true)
 |-- Project Number: string (nullable = true)
 |-- Modified Time: string (nullable = true)
 |-- Published Time: string (nullable = true)
 |-- Updated Time: string (nullable = true)
 |-- Program Name: string (nullable = true)

标签: pythondataframeapache-sparkpysparkpyspark-dataframes

解决方案


您可以在写入之前将 null 列转换为字符串类型:

from pyspark.sql.types import NullType
import pyspark.sql.functions as F

# Check each column type. If it's nulltype, cast to string type,
# else keep the original column.

df2 = df.select([
    F.lit(None).cast('string').alias(i.name)
    if isinstance(i.dataType, NullType)
    else i.name
    for i in df.schema
])

df2.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').save(output_path, quote='', escape='\"', sep='|', header='True', nullValue=None)

推荐阅读