首页 > 解决方案 > PySpark:从 Oracle 表中选择一个值,然后添加到它

问题描述

我使用 PySpark 将行从 Oracle 加载到 AWS。我一次抓取 10000 行,然后存储加载的最大 seq_id 并将其用于下一个范围。

我正在尝试在 PySpark 中执行此操作,但我无法弄清楚。任何人都可以帮助或指出一个有用的培训资源吗?我尝试将输出转换为 Int。我尝试了 select.collect[0][0] 但也遇到了错误。我对 PySpark 非常陌生,因此非常感谢任何帮助。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

def oracle_read(user,pwd,hostname,port,service_name,table_name):
    url = 'jdbc:oracle:thin:'+user+'/'+pwd+'@//'+hostname+':'+port+'/'+service_name
    result = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable",table_name) \
    .option("user", user) \
    .option("password", pwd) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load() 
    result = result.toDF(* [c.lower() for c in result.columns])
    return result
    
max_seq_qry = """(SELECT max_val FROM data_owner.tbl_max_seq_load WHERE table_name = 'TBL_A')"""
max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
min_seq = max_seq + 1
max_seq = max_seq + 10000

我收到以下错误:

TypeError: unsupported operand type(s) for +: 'DataFrame' and 'int'
NameError: name 'IntegerType' is not defined
TypeError: 'instancemethod' object has no attribute '__getitem__'

标签: pythonoraclepysparkapache-spark-sqlamazon-emr

解决方案


您的函数oracle_read返回一个数据帧(结果)并且您试图增加它(向它添加一个),这是不可能的,因此是错误的。

在您的情况下,您只从数据库中获取一列“max_val”,并且您可能是第一个匹配项,因此您可以选择此列并将第一个值作为max_seq['max_val'].values[0]

所以你可以将你的代码重写为

max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
max_seq = int(max_seq['max_val'].values[0]) + 1

推荐阅读