python - 将数据从 MySQL 数据库移植到 PostgreSQL
问题描述
我需要在我们在 PostgreSQL 上运行的数据仓库中创建一个临时表,并从我们使用 MySQL 数据库的 Magento 网站导入数据,我正在尝试使用 Python。
我为导入目的创建了以下查询,
1.请您检查并确认这是否正常?或者有什么替代方法可以做到吗?
2.另外我想知道我们如何在移植时管理 DATATYPE 不匹配问题?
3.我们可以用Multicorn等外国数据说唱歌手(FDW)做些什么吗?我们该怎么做?我只需要来自源的几列(源有50多列,我只需要15列)转移到目的地,那么FDW会起作用吗?
如果有人可以发布示例或编辑以下代码,那将有很大帮助。
import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
#from utils.utils import get_global_config
def psql_command(msql, psql, msql_command, psql_command):
msql.execute(msql_command)
for row in cur_msql:
try:
psql.execute(command, row)
except psycopg2.Error as e:
print "Cannot execute the query!!", e.pgerror
sys.exit("Some problem occured with the query!!!")
def dB_Fetch():
try:
cnx_msql = mysql.connector.connect( host=host_mysql,
user=user_mysql,passwd=pswd_mysql, db=dbna_mysql )
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Postgresql connection
try:
cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
try:
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging
AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS
staging.sales_flat_quote
(
entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at TIMESTAMP WITHOUT TIME ZONE
, updated_at TIMESTAMP WITHOUT TIME ZONE
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
);"""
SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS
staging.sales_flat_quote_item
( store_id INTEGER
, row_total NUMERIC
, updated_at TIMESTAMP WITHOUT TIME ZONE
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
);"""
print("Creating Schema")
cur_psql.execute(SQL_create_Staging_schema)
print("schema succesfully created")
print("Creating staging.sales_flat_quote table")
cur_psql.execute(SQL_create_sales_flat_quote)
print("staging.sales_flat_quote table succesfully created")
print("Creating staging.sales_flat_quote_item table")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("staging.sales_flat_quote_item table succesfully created")
cur_psql.commit();
print("Fetching data from source server")
commands = [
(
"SELECT customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate
,is_active from sales_flat_quote
where is_active=1;",
"INSERT INTO staging.sales_flat_quote
(customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate,is_active) \
VALUES (%(customer_id)s, %(entity_id)s
, %(store_id)s, %(created_at)s, %(updated_at)s
, %(items_count)s, %(base_row_total)s
, %(row_total)s, %(base_discount_amount)s
, %(base_subtotal_with_discount)s
, %(base_to_global_rate)s, %(is_active)s)"
),
(
"SELECT store_id,row_total,updated_at,qty,sku
,free_shipping,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at from
sales_flat_quote_item",
"INSERT INTO staging.sales_flat_quote_item
(store_id,row_total,updated_at,qty,sku,free_shipping
,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at)
VALUES (%(store_id)s, %(row_total)s, %(updated_at)s
, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s
, %(price)s, %(no_discount)s, %(item_id)s
, %(product_type)s, %(base_tax_amount)s, %(product_id)s
, %(name)s, % . (created_at)s)"
)
]
for msql_command, psql_command in commands:
psql_command(cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Closing cursors
cur_msql.close()
cur_psql.close()
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()
解决方案
完成了他来自 jaisus 的评论。
感谢您的评论,我认为最简单的方法确实是使用 FDW 来完成这项特定任务。您可以在 github (github.com/EnterpriseDB/mysql_fdw) 上找到 mysql_fdw。你具体需要什么,你在这件事上有什么困难?– Jaisus 5 月 9 日 12:26
推荐阅读
- java - 在数字为 6 之后获取的数字,但我想要所有数字
- python - pytest 模拟我自己在测试函数中导入的模块
- javascript - 条纹订阅 JS 问题
- r - 使用 DPLYR full_join 加入 3 个大数据帧时,如何修复错误:std::bad_alloc 消息?
- swift - 可选仅在条件编译块中设置,编译器是否从发布版本中完全删除语句?
- php - WooCommerce 订单列的重新排序
- ruby-on-rails - Ruby on Rails 测试失败或成功取决于测试套件的调用方式
- php - 翻译 functions.php 中的字符串 Loco Translate
- python - 跨两个不同列的平均值
- spring-boot - Spring REST 安全性:如何实现“两步”认证