首页 > 解决方案 > Pyspark:从表中读取数据并写入文件

问题描述

我正在使用 HDInsight spark 集群来运行我的 Pyspark 代码。我正在尝试从 postgres 表中读取数据并写入如下文件。pgsql_df 返回 DataFrameReader 而不是 DataFrame。所以我无法将 DataFrame 写入文件。为什么“spark.read”返回 DataFrameReader。我在这里想念什么?

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")```

>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>


pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)


**Error:** 
 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'





标签: pythonapache-sparkpysparkazure-hdinsight

解决方案


请检查以下代码。您缺少在 DataFrameReader 对象上调用 load() 。

pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")
    .load() // this is missing 

pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

or 


pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")

pgsql_df
.load() \ added here 
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)


推荐阅读