首页 > 解决方案 > psycopg2 将行作为列插入

问题描述

我不确定我是否正确地表达了这一点,但是当从一个数据库查询数据并将其插入另一个数据库时psycopg2,脚本有时会分解数据,并将行作为列插入(即每打印一个字符每行列)如下:

     name     | id_2       | id_1        | blank        | data
--------------+------------+-------------+--------------+--------------
 A            | 1          | a           |              | {json: data}
 B            | a          | 1           |              | {json: data}
 C            | 2          | b           |              | {json: data}
 D            | b          | 2           |              | {json: data}
 E            | 3          | c           |              | {json: data}
 F            | c          | 3           |              | {json: data}

我希望表格如下所示:

     name     | id_2       | id_1        | blank        | data
--------------+------------+-------------+--------------+--------------
 ABCDEF       | 1a2b3c     | a1b2c3      |              | {json: data}

我必须生成的代码是:

import psycopg2, uuid

id_1 = str(uuid.uuid4())

conn1 = psycopg2.connect(
    host='host',
    database=db1,
    user='user',
    password='password')

conn2 = psycopg2.connect(
    host='host',
    database=db2
    user='user',
    password='password')

def generate_data(id_1, name, id_2):

    data = {
        "tag": name,
        "id_2": id_2,
        "id_1": id_1,
        "rand_int": random.randint(0, 86400),
        "rand_uni_1": str(round(random.uniform(0.0, 8.0), 2)),
        "rand_uni_2": str(round(random.uniform(0.0, 16.0), 2)),
        "digi": "",
        "point_type": random.randint(0, 1),
        "garbage_data": "garbage_data"
    }
    data_str = str(json.dumps(data).replace("'", "''"))
    add_data(name, id_2, id_1, data_str)

def add_data(name_rec, id_2_rec, id_1_rec, data_rec):
    insert_data = "INSERT into table(name, id_2, id_1, data) VALUES (%s, %s, %s, %s::jsonb) ON CONFLICT DO NOTHING;"
    try:
        cur = conn2.cursor()
        cur.executemany(insert_data, zip(name_rec, id_2_rec, id_1_rec, data_rec))
        conn2.commit()
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn2 is not None:
            conn2.close()

def query_data():
    try:
        create_data_table()
        cur = conn1.cursor()
        cur.execute("SELECT name, table_name FROM existing_table;")
        data_returned = cur.fetchall()
        name = [i[0] for i in data_returned]
        old_table = [i[1] for i in data_returned]
        for x in data_returned:
            name = x[0]
            new_id_2 = x[1][5:].replace("_","-")
            generate_data(id_1_rec, name, new_id_2)
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

def create_data_table():
    with conn2.cursor() as cursor:
        cursor.execute(
            "CREATE TABLE IF NOT EXISTS table "
            "("
            "name VARCHAR(100), "
            "id_2 VARCHAR(50), "
            "id_1 VARCHAR(50), "
            "data JSONB"
            ");"
        )
        conn2.commit()
    cursor.close()

create_data_table()
query_data()

我正在使用 print 语句来验证数据是否应该插入到表中。我在隔离此代码遇到的另一个问题时发现了这一点jsonb,我将在接下来发布。我对来自 QA 的 SQL 和 Python(以及 psycopg2)相对较新,因此非常感谢任何帮助!

标签: pythonpython-3.xpostgresqlpsycopg2psql

解决方案


今天早上站起来后,我抓住了我们的一些开发人员,并解决了这个问题,如果不清楚的话,就是如上所述将数据输入数据库的方式。

这里的问题在于线路cur.executemany(insert_data, zip(name_rec, id_2_rec, id_1_rec, data_rec))- 更具体地说是executemany方法和zip()功能。由于该zip()函数返回一个 zip 对象,它是元组的迭代器,其中每个传递的迭代器中的第一项(以及所有后续项)配对在一起,并且该executemany方法准备数据库操作(查询或命令)并针对所有参数执行它在序列 seq_of_params 中找到的序列或映射,这导致数据逐个字符地输入到新表中。这已通过将行更改为修复,cur.execute(insert_data, (name_rec, id_2_rec, id_1_rec, data_rec))现在我的数据按原样显示,即

     name     | id_2       | id_1        | blank        | data
--------------+------------+-------------+--------------+--------------
 ABCDEF       | 1a2b3c     | a1b2c3      |              | {json:data}

我为凌乱的代码道歉,如果我最初没有正确地表达这个问题 - 我在大约 2.5 年前上学音乐并进入这个领域,作为一名没有任何经验的 QA,所以这对我来说仍然相对较新。


推荐阅读