首页 > 解决方案 > 在 pyspark 中每隔 1 小时从 postgres DB 读取数据

问题描述

我想以 1 小时的时间间隔从 postgres db 读取数据,我希望该进程每隔一小时运行一次。我怎样才能做到这一点?我附上了我的代码片段。我无法将 readstream 用于 jdbc 选项。

df = spark.read \
.format("jdbc") \
.option("url", URL) \
.option("dbtable", "tagpool_with_tag_raw") \
.option("user", "tsdbadmin") \
.option("password", "cgqu5qss2zy3i1") \
.option("driver", "org.postgresql.Driver") \
.load()

# Getting the current date and time
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = utc_time.timestamp()
epoch = round(utc_timestamp / 60) * 60
# epoch = epoch+3600
print("epoch ", epoch)

df.createOrReplaceTempView("tagpool_with_tag_raw")
x = spark.sql("""select *  from tagpool_with_tag_raw""")
x.show()
query = spark.sql("select *  from tagpool_with_tag_raw WHERE input_time = " + str(epoch))  # .format()

    # query = spark.sql("select CAST(input_time AS bigint), CAST(orig_time AS bigint) ,  from tagpool_with_tag_raw WHERE input_time = "+ epoch) #.format()
query.show()
# df.selectExpr(("SELECT * FROM public.tagpool_raw WHERE input_time<= %s".format(epoch)))
df.printSchema()

query.write \
    .format("jdbc") \
    .option("url", URL) \
    .option("dbtable", "tagpool_tag_raw") \
    .option("user", USER) \
    .option("password", PW) \
    .option("driver", DRIVER).save()

标签: pythonpostgresqlapache-sparkpysparkbatch-processing

解决方案


Readstream 不适用于 jdbc ,因为 jdbc 是批处理操作,您必须像您所做的那样创建一个流程,并使用 AutoSys 或 oozie 之类的调度程序或您的企业每小时运行的任何东西。


推荐阅读