首页 > 解决方案 > 将数据从 MySQL 数据库移植到 PostgreSQL

问题描述

我需要在我们在 PostgreSQL 上运行的数据仓库中创建一个临时表,并从我们使用 MySQL 数据库的 Magento 网站导入数据,我正在尝试使用 Python。

我为导入目的创建了以下查询,

1.请您检查并确认这是否正常?或者有什么替代方法可以做到吗?

2.另外我想知道我们如何在移植时管理 DATATYPE 不匹配问题?

3.我们可以用Multicorn等外国数据说唱歌手(FDW)做些什么吗?我们该怎么做?我只需要来自源的几列(源有50多列,我只需要15列)转移到目的地,那么FDW会起作用吗?

如果有人可以发布示例或编辑以下代码,那将有很大帮助。

import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
#from utils.utils import get_global_config


def psql_command(msql, psql, msql_command, psql_command):

    msql.execute(msql_command)

    for row in cur_msql:
        try:
            psql.execute(command, row)
        except psycopg2.Error as e:
            print "Cannot execute the query!!", e.pgerror
            sys.exit("Some problem occured with the query!!!")


def dB_Fetch():

try:
  cnx_msql = mysql.connector.connect( host=host_mysql, 
  user=user_mysql,passwd=pswd_mysql, db=dbna_mysql )

except mysql.connector.Error as e:
  print "MYSQL: Unable to connect!", e.msg
  sys.exit(1)

# Postgresql connection
try:
  cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
  print('PSQL: Unable to connect!\n{0}').format(e)
  sys.exit(1)

# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()

try:
  SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging 
  AUTHORIZATION postgres;"""

  SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS 
  staging.sales_flat_quote
        (
              entity_id           BIGINT
    , store_id        BIGINT
    , customer_email      TEXT
    , customer_firstname  TEXT
    , customer_middlename TEXT
    , customer_lastname   TEXT
    , customer_is_guest   BIGINT
    , customer_group_id   BIGINT
    , created_at          TIMESTAMP WITHOUT TIME ZONE
    , updated_at          TIMESTAMP WITHOUT TIME ZONE
    , is_active             BIGINT
    , items_count           BIGINT
    , items_qty             BIGINT
    , base_currency_code            TEXT
    , grand_total           NUMERIC(12,4)
    , base_to_global_rate           NUMERIC(12,4)
    , base_subtotal         NUMERIC(12,4)
    , base_subtotal_with_discount   NUMERIC(12,4)
    );"""

SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS 
staging.sales_flat_quote_item
        (     store_id        INTEGER
    , row_total       NUMERIC
    , updated_at      TIMESTAMP WITHOUT TIME ZONE
    , qty             NUMERIC
    , sku             CHARACTER VARYING
    , free_shipping   INTEGER
    , quote_id        INTEGER
    , price       NUMERIC
    , no_discount     INTEGER
    , item_id     INTEGER
    , product_type    CHARACTER VARYING
    , base_tax_amount NUMERIC
    , product_id      INTEGER
    , name        CHARACTER VARYING
    , created_at      TIMESTAMP WITHOUT TIME ZONE
    );"""

print("Creating  Schema")   
cur_psql.execute(SQL_create_Staging_schema)
print("schema  succesfully created")

print("Creating staging.sales_flat_quote table")
cur_psql.execute(SQL_create_sales_flat_quote)
print("staging.sales_flat_quote table  succesfully created")

print("Creating staging.sales_flat_quote_item table")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("staging.sales_flat_quote_item table  succesfully created")

cur_psql.commit();
print("Fetching data from source server")




    commands = [
(
"SELECT customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate
,is_active from sales_flat_quote 
where is_active=1;",

 "INSERT INTO staging.sales_flat_quote 
 (customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate,is_active) \
 VALUES (%(customer_id)s, %(entity_id)s
, %(store_id)s, %(created_at)s, %(updated_at)s
, %(items_count)s, %(base_row_total)s
, %(row_total)s, %(base_discount_amount)s
, %(base_subtotal_with_discount)s
, %(base_to_global_rate)s, %(is_active)s)"

),

(
 "SELECT store_id,row_total,updated_at,qty,sku
,free_shipping,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at from 
 sales_flat_quote_item",

 "INSERT INTO staging.sales_flat_quote_item 
 (store_id,row_total,updated_at,qty,sku,free_shipping
 ,quote_id,price,no_discount,item_id,product_type
 ,base_tax_amount,product_id,name,created_at) 
VALUES (%(store_id)s, %(row_total)s, %(updated_at)s
, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s
, %(price)s, %(no_discount)s, %(item_id)s
, %(product_type)s, %(base_tax_amount)s, %(product_id)s
, %(name)s, % . (created_at)s)"
)

                ]

for msql_command, psql_command in commands:
    psql_command(cur_msql, cur_psql, msql_command, psql_command)

 except (Exception, psycopg2.Error) as error:
     print ("Error while fetching data from PostgreSQL", error)
 finally:
     ## Closing cursors
     cur_msql.close()
     cur_psql.close()
     ## Committing
     cnx_psql.commit()
     ## Closing database connections
     cnx_msql.close()
     cnx_psql.close()


if __name__ == '__main__':
   dB_Fetch()

标签: pythonmysqlpython-3.xpostgresqlmagento

解决方案


完成了他来自 jaisus 的评论。

感谢您的评论,我认为最简单的方法确实是使用 FDW 来完成这项特定任务。您可以在 github (github.com/EnterpriseDB/mysql_fdw) 上找到 mysql_fdw。你具体需要什么,你在这件事上有什么困难?– Jaisus 5 月 9 日 12:26


推荐阅读