python - 在 pyspark 中每隔 1 小时从 postgres DB 读取数据
问题描述
我想以 1 小时的时间间隔从 postgres db 读取数据,我希望该进程每隔一小时运行一次。我怎样才能做到这一点?我附上了我的代码片段。我无法将 readstream 用于 jdbc 选项。
df = spark.read \
.format("jdbc") \
.option("url", URL) \
.option("dbtable", "tagpool_with_tag_raw") \
.option("user", "tsdbadmin") \
.option("password", "cgqu5qss2zy3i1") \
.option("driver", "org.postgresql.Driver") \
.load()
# Getting the current date and time
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = utc_time.timestamp()
epoch = round(utc_timestamp / 60) * 60
# epoch = epoch+3600
print("epoch ", epoch)
df.createOrReplaceTempView("tagpool_with_tag_raw")
x = spark.sql("""select * from tagpool_with_tag_raw""")
x.show()
query = spark.sql("select * from tagpool_with_tag_raw WHERE input_time = " + str(epoch)) # .format()
# query = spark.sql("select CAST(input_time AS bigint), CAST(orig_time AS bigint) , from tagpool_with_tag_raw WHERE input_time = "+ epoch) #.format()
query.show()
# df.selectExpr(("SELECT * FROM public.tagpool_raw WHERE input_time<= %s".format(epoch)))
df.printSchema()
query.write \
.format("jdbc") \
.option("url", URL) \
.option("dbtable", "tagpool_tag_raw") \
.option("user", USER) \
.option("password", PW) \
.option("driver", DRIVER).save()
解决方案
Readstream 不适用于 jdbc ,因为 jdbc 是批处理操作,您必须像您所做的那样创建一个流程,并使用 AutoSys 或 oozie 之类的调度程序或您的企业每小时运行的任何东西。
推荐阅读
- batch-file - 加密winrar命令行的密码部分
- node.js - 节点生成子进程在 Windows 中不起作用
- subtraction - 求解和证明两个元素相减并取模的 DFA
- javascript - 在 javascript html css 中放置模态是否与在电子 js 中相同?
- python-3.x - 在 Tkinter Python3 的列表中使用多个 StringVar() 变量
- python - sklearn Logistic Regression - 使用来自外部测试数据的自定义输入进行预测
- javascript - 可能无法导出受污染的“OffscreenCanvas”
- excel - Excel中TextFrame中的行距
- wordpress - 如何删除wordpress中的评论栏
- javascript - 进度条在达到 100% 后未完全占据