首页 > 解决方案 > Oracle 分区表到 Datalake

问题描述

我打算每月自动将oracle数据库中间隔为一个月的分区表制作成datalake。插入datalake后,我将删除分区。

此代码是 Parquet Datalake 中的 Oracle 表

from pyspark.sql import SparkSession
from datetime import date

db_host = "IP"
db_port = "PORT"
db_service = "SERVICE_NAME"
schema_name = "schema"
table_name = "T_name"
#today = date.today()
#dir_name = today.strftime("%Y-%m-%d")
db_user = "oracle_user"
db_pass = "oracle_password"

spark = SparkSession.builder \
        .appName("Load " + schema_name + " " + table_name + " from Oracle into Parquet and creating Table") \
        .getOrCreate()



#load both CSV/DSEFS files into DataFrames
query = '(SELECT * FROM ' + schema_name + '.' + table_name + ' c) '+ table_name + ''

df = spark.read \
    .format("jdbc") \
    .option("url","jdbc:oracle:thin:@" + db_host + ":" + db_port + "/" + db_service) \
    .option("dbtable",query) \
    .option("user",db_user) \
    .option("password",db_pass) \
    .option("driver","oracle.jdbc.OracleDriver") \
    .option("encoding","UTF-8") \
    .option("fetchSize", 10000) \
    .option("numPartitions",40) \
    .load()

# Convert it to Spark SQL table and save it as parquet format

df.write \
    .format("parquet") \
    .option("path","/archive/" + schema_name + "_" + table_name + ".parquet") \
    .mode("overwrite") \
    .saveAsTable(table_name)

我正在使用 CA Workload 自动化 AE,并创建了这个工作。

#!/bin/bash


. /ca/.profile

pgm=`basename $0`

source app3

sqlplus /"Pass"@S_name @/ca/autosys_script/archive_sms_alerts_history.sql > ${sysout}/${pgm}.log 2>&1


echo "************************************************"
echo "log file dir: " ${sysout}/${pgm}.log

cat ${sysout}/${pgm}.log

if grep -q "ORA-" ${sysout}/${pgm}.log; then
  echo "************************************************"
  echo " ERROR 06 : SMS_ALERTS_HISTORY table archive job failed   "
  echo "            Please call database administrator. "
  echo "************************************************"
  echo " "
  exit 6
else
  echo "*******SMS ALERTS HISTORY ARCHIVE SUCCESSFULLY COMPLETED********"
fi

这是 SQL

DECLARE
    v_partname   VARCHAR2 (50);
    v_cnt        INT;
BEGIN
    SELECT COUNT (*)
      INTO v_cnt
      FROM dba_tab_partitions
     WHERE table_owner = 'SMSALERT' AND table_name = 'SMS_ALERTS_HISTORY';
    
    
    ###rename partion table   I will rename the partition table SYS_P### to P202005 etc.
    rename partition table 
    IF v_cnt > 3
    THEN
        
        SELECT partition_name
          INTO v_partname
          FROM dba_tab_partitions
         WHERE     table_owner = 'schema'
               AND table_name = 'table'
               AND partition_position IN
                       (SELECT MIN (partition_position)
                          FROM dba_tab_partitions
                         WHERE table_name = 't_name');
#checking Partition name
       verify v_partname = 202005
       
        EXECUTE IMMEDIATE
               'insert /*+ PARALLEL(10) */ into SMSALERT.ARC_SMS_ALERTS_HISTORY select * from SMSALERT.SMS_ALERTS_HISTORY partition('
            || v_partname
            || ')';

        EXECUTE IMMEDIATE
               'alter table SMSALERT.SMS_ALERTS_HISTORY drop partition '
            || v_partname
            || ' UPDATE INDEXES';

        EXECUTE IMMEDIATE 'ALTER TABLE SMSALERT.SMS_ALERTS_HISTORY SET INTERVAL ( NUMTOYMINTERVAL(1, ''MONTH''))';            
        COMMIT;
    ELSE
        dbms_output.put_line('PARTITION IS LESS THAN 3 - SKIPPED');
    END IF;
END;
/

EXIT;

完成 4 CA 工作

  1. 如果分区 cnt > 3,检查分区 Oracle
  2. 查找最旧的分区执行 python 作业
  3. 在 Datalake 中检查从 oracle 插入的数据 ---- 我需要这个 <<<<<
  4. 删除 Oracle 中最旧的分区 谢谢 ^_^

标签: pythonsqllinuxoracleapache-spark

解决方案


推荐阅读