python - PySpark:从 Oracle 表中选择一个值,然后添加到它
问题描述
我使用 PySpark 将行从 Oracle 加载到 AWS。我一次抓取 10000 行,然后存储加载的最大 seq_id 并将其用于下一个范围。
我正在尝试在 PySpark 中执行此操作,但我无法弄清楚。任何人都可以帮助或指出一个有用的培训资源吗?我尝试将输出转换为 Int。我尝试了 select.collect[0][0] 但也遇到了错误。我对 PySpark 非常陌生,因此非常感谢任何帮助。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
def oracle_read(user,pwd,hostname,port,service_name,table_name):
url = 'jdbc:oracle:thin:'+user+'/'+pwd+'@//'+hostname+':'+port+'/'+service_name
result = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable",table_name) \
.option("user", user) \
.option("password", pwd) \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()
result = result.toDF(* [c.lower() for c in result.columns])
return result
max_seq_qry = """(SELECT max_val FROM data_owner.tbl_max_seq_load WHERE table_name = 'TBL_A')"""
max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
min_seq = max_seq + 1
max_seq = max_seq + 10000
我收到以下错误:
TypeError: unsupported operand type(s) for +: 'DataFrame' and 'int'
NameError: name 'IntegerType' is not defined
TypeError: 'instancemethod' object has no attribute '__getitem__'
解决方案
您的函数oracle_read
返回一个数据帧(结果)并且您试图增加它(向它添加一个),这是不可能的,因此是错误的。
在您的情况下,您只从数据库中获取一列“max_val”,并且您可能是第一个匹配项,因此您可以选择此列并将第一个值作为max_seq['max_val'].values[0]
所以你可以将你的代码重写为
max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
max_seq = int(max_seq['max_val'].values[0]) + 1
推荐阅读
- java - Java中的闭包
- r - 如何减去数据框中的字符串数
- reactjs - Uncaught TypeError: Object(...) is not a function with React, Formik and Webpack
- .net - FSharp:为什么 `select` 子句不能放在查询表达式中的 `where` 之前?
- java - 如何忽略来自 Kafka 主题的未提交消息
- flutter - 切换版本快速颤动
- python - 变量值不更新
- django - 出现错误:未找到“学院”的反向。'college' 不是有效的视图函数或模式名称
- java - 从 AndroidStudio 中的 firebase 检索数据
- javascript - 当我插入两个 Y 轴时,Apexchart 会留下包装器