python - Pyspark:从表中读取数据并写入文件
问题描述
我正在使用 HDInsight spark 集群来运行我的 Pyspark 代码。我正在尝试从 postgres 表中读取数据并写入如下文件。pgsql_df 返回 DataFrameReader 而不是 DataFrame。所以我无法将 DataFrame 写入文件。为什么“spark.read”返回 DataFrameReader。我在这里想念什么?
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")```
>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>
pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
**Error:**
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'
解决方案
请检查以下代码。您缺少在 DataFrameReader 对象上调用 load() 。
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")
.load() // this is missing
pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
or
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")
pgsql_df
.load() \ added here
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
推荐阅读
- html - Angular 6 中的自定义组件:布尔属性的值
- java - 如何在scrollView.setOnScrollChangeListener中获取scrollY的最大值
- python - 从 HTTP 响应到 csv
- c# - 如何将 Matrix 转换为 Mat 类型?
- python - 向独立进程发送信号/事件
- snakemake - 使用 snakemake 打印简化的 DAG 图
- python - 从python中的多个文件夹加载多个图像
- python-3.x - Python 3 上下文管理器模拟单元测试
- rxjs - RxJS 6 /点击运算符何时发出值
- node.js - 使用打字稿配置本机反应时出错