python - Oracle 分区表到 Datalake
问题描述
我打算每月自动将oracle数据库中间隔为一个月的分区表制作成datalake。插入datalake后,我将删除分区。
此代码是 Parquet Datalake 中的 Oracle 表
from pyspark.sql import SparkSession
from datetime import date
db_host = "IP"
db_port = "PORT"
db_service = "SERVICE_NAME"
schema_name = "schema"
table_name = "T_name"
#today = date.today()
#dir_name = today.strftime("%Y-%m-%d")
db_user = "oracle_user"
db_pass = "oracle_password"
spark = SparkSession.builder \
.appName("Load " + schema_name + " " + table_name + " from Oracle into Parquet and creating Table") \
.getOrCreate()
#load both CSV/DSEFS files into DataFrames
query = '(SELECT * FROM ' + schema_name + '.' + table_name + ' c) '+ table_name + ''
df = spark.read \
.format("jdbc") \
.option("url","jdbc:oracle:thin:@" + db_host + ":" + db_port + "/" + db_service) \
.option("dbtable",query) \
.option("user",db_user) \
.option("password",db_pass) \
.option("driver","oracle.jdbc.OracleDriver") \
.option("encoding","UTF-8") \
.option("fetchSize", 10000) \
.option("numPartitions",40) \
.load()
# Convert it to Spark SQL table and save it as parquet format
df.write \
.format("parquet") \
.option("path","/archive/" + schema_name + "_" + table_name + ".parquet") \
.mode("overwrite") \
.saveAsTable(table_name)
我正在使用 CA Workload 自动化 AE,并创建了这个工作。
#!/bin/bash
. /ca/.profile
pgm=`basename $0`
source app3
sqlplus /"Pass"@S_name @/ca/autosys_script/archive_sms_alerts_history.sql > ${sysout}/${pgm}.log 2>&1
echo "************************************************"
echo "log file dir: " ${sysout}/${pgm}.log
cat ${sysout}/${pgm}.log
if grep -q "ORA-" ${sysout}/${pgm}.log; then
echo "************************************************"
echo " ERROR 06 : SMS_ALERTS_HISTORY table archive job failed "
echo " Please call database administrator. "
echo "************************************************"
echo " "
exit 6
else
echo "*******SMS ALERTS HISTORY ARCHIVE SUCCESSFULLY COMPLETED********"
fi
这是 SQL
DECLARE
v_partname VARCHAR2 (50);
v_cnt INT;
BEGIN
SELECT COUNT (*)
INTO v_cnt
FROM dba_tab_partitions
WHERE table_owner = 'SMSALERT' AND table_name = 'SMS_ALERTS_HISTORY';
###rename partion table I will rename the partition table SYS_P### to P202005 etc.
rename partition table
IF v_cnt > 3
THEN
SELECT partition_name
INTO v_partname
FROM dba_tab_partitions
WHERE table_owner = 'schema'
AND table_name = 'table'
AND partition_position IN
(SELECT MIN (partition_position)
FROM dba_tab_partitions
WHERE table_name = 't_name');
#checking Partition name
verify v_partname = 202005
EXECUTE IMMEDIATE
'insert /*+ PARALLEL(10) */ into SMSALERT.ARC_SMS_ALERTS_HISTORY select * from SMSALERT.SMS_ALERTS_HISTORY partition('
|| v_partname
|| ')';
EXECUTE IMMEDIATE
'alter table SMSALERT.SMS_ALERTS_HISTORY drop partition '
|| v_partname
|| ' UPDATE INDEXES';
EXECUTE IMMEDIATE 'ALTER TABLE SMSALERT.SMS_ALERTS_HISTORY SET INTERVAL ( NUMTOYMINTERVAL(1, ''MONTH''))';
COMMIT;
ELSE
dbms_output.put_line('PARTITION IS LESS THAN 3 - SKIPPED');
END IF;
END;
/
EXIT;
完成 4 CA 工作
- 如果分区 cnt > 3,检查分区 Oracle
- 查找最旧的分区执行 python 作业
- 在 Datalake 中检查从 oracle 插入的数据 ---- 我需要这个 <<<<<
- 删除 Oracle 中最旧的分区 谢谢 ^_^
解决方案
推荐阅读
- node.js - Nodemon中的神秘参数-r
- python - TypeError: wrapper() 接受 1 个位置参数,但给出了 2 个
- elasticsearch - 使用输入数组进行 Elasticsearch 过滤,其中
- c - 如何将 2 个大数字乘以字符串(使用之前添加 2 个字符串的函数)?C
- http - Flutter http请求上传mp3文件
- java - Android Studio 布局没有响应
- c++ - 使用 IMFSinkWriter 编码的视频的播放速度根据宽度变化
- java - 切换到新的 Google Maps SDK 时出现迁移错误
- javascript - 考虑到 Windows 10 全局缩放的鼠标位置
- python - 连接两行或多行,而不会在最后一行出现重复