首页 > 解决方案 > 使用 Psycopg2 的失败事务是否会导致 SQL 约束停止表现为延迟?

问题描述

我有一个 Python 脚本,它使用 Psycopg2 库在 Postgres 数据库中插入数据。这些表是使用 Django 迁移创建的。虽然数据库由 Django 使用,但它是数据分析系统的一部分,也将使用 Psycopg2 访问和操作。一切都在 Ubuntu 上运行。数据库的简化版本如下所示。

简化的数据库结构

Django 通过 POST 请求接收 zip 文件并将相应的条目添加到 Upload 表中。每个条目都有 zip 文件的位置。在每个 Upload zip 中,都有 Session zip,其中又包含包含相关数据的 CSV 文件。会话 zip 和 CSV 文件在数据库中没有引用,但其信息是使用上述脚本插入的。在我的完整系统中,有更多的表,但要提出问题,每个会话的数据表就足够了 - 每个会话 zip 应该有 2 个 CSV,一个用于数据,另一个用于会话元数据。脚本的循环如下所示。

脚本循环

所以基本上对于每个 Upload zip,它的会话都被提取并一个一个地插入到数据库中。Session 的数据和相应的 Data 将被插入到单个事务中。由于 Data 具有引用 Session 的外键,因此必须延迟这些字段。约束设置为 deferable 最初deferred 。要使用的 Session id 主键是按现有的最大 Session id 值递增计算的。

但有时,接收到的数据已损坏或不完整,并且事务提交失败,这是理所当然的。问题是,在其中一次失败之后,每次尝试新的 Session 插入时,事务都会失败并显示错误消息,指出 Session 和 Data 之间的外键约束被违反,就好像字段没有被延迟一样!

系统仍会在 Upload 中接收并插入新条目,但插入新会话的问题仍然存在。如果我销毁数据库并重新创建它,那么一切都运行良好,直到其中一个事务失败,之后由于外键违规再次无法插入更多会话。

什么可能导致这种行为?显然,由于交易失败,这些字段不再像定义的那样延迟。

我知道我的文字很长,但这是我发现表达我的问题的最佳方式。我提前感谢任何花时间阅读并可能分享他们专业知识的人。

软件版本为 Postgres 10.12;心理战 2.8.5;Django 2.2.12;Python 3.6.9;Ubuntu 18.04。

更新 - 最小的可重现示例

下面列出了完全重现我的问题的步骤。对于你们中的一些人来说,许多是不必要或太明显的,但我选择包括所有内容,以便任何人都可以遵循。如果使用不同的软件,则必须对此进行调整。我修改了我的示例系统,使其完全独立于 Django。

A - 输入你的 Ubuntu 系统

B - 安装软件(其中一些可能是不必要的)

sudo apt update
sudo apt install python3-pip python3-dev libpq-dev postgresql postgresql-contrib

C -在你的 Linux 主目录中创建一个 alt_so_reprex 目录并 cd 到它

mkdir alt_so_reprex
cd alt_so_reprex

D - 创建虚拟环境

virtualenv venv
source venv/bin/activate
pip install psycopg2

E - 创建下面列出的 5 个脚本 -在每个脚本中将 {YOUR_USERNAME} 替换为您的 Linux 用户名。授予每个人运行的权限。

chmod +x 1_user_and_db.sh 2_db_tables.py 3_insert_uploads.py 4_create_test_files.sh 5_insert_sessions.py

脚本 1:1_user_and_db.sh

#!/bin/bash

# Create user if it does not exist
if [ "$( sudo -u postgres -H -- psql -c "SELECT 1 FROM pg_roles WHERE rolname='{YOUR_USERNAME}'" )" != '1' ]
then
    sudo -u postgres -H -- psql -c "CREATE USER {YOUR_USERNAME} WITH PASSWORD 'password';";
fi

# Create the PgSQL database (ignore the error the first time this runs)
sudo -u postgres -H -- psql -c "DROP DATABASE test_db;";
sudo -u postgres -H -- psql -c "CREATE DATABASE test_db;";
sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET client_encoding TO 'utf8';";
sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET default_transaction_isolation TO 'read committed';";
sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET timezone TO 'UTC';";
sudo -u postgres -H -- psql -d test_db -c "GRANT ALL PRIVILEGES ON DATABASE test_db TO {YOUR_USERNAME};";

# Show database
sudo -u postgres -H -- psql -d test_db -c "\l";

脚本 2:2_db_tables.py(基于 @snakecharmerb 的贡献 - 谢谢)

#!/usr/bin/env python3

import psycopg2

# TABLE CREATION

reprex_upload = """CREATE TABLE Reprex_Upload (
    id BIGSERIAL PRIMARY KEY,
    zip_file VARCHAR(128),
    processed BOOLEAN DEFAULT FALSE
) """

reprex_session = """CREATE TABLE Reprex_Session (
    id BIGSERIAL PRIMARY KEY,
    metadata VARCHAR(128),
    upload_id BIGINT REFERENCES Reprex_Upload ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED 
) """

reprex_data = """CREATE TABLE Reprex_Data (
    id BIGSERIAL PRIMARY KEY,
    data VARCHAR(128),
    session_id BIGINT REFERENCES Reprex_Session ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
)"""

print("Creating tables...")
with psycopg2.connect(dbname='test_db', user='{YOUR_USERNAME}', host='localhost', password='password') as conn:
    cur = conn.cursor()
    cur.execute(reprex_upload)
    cur.execute(reprex_session)
    cur.execute(reprex_data)
    conn.commit()

脚本 3:3_insert_uploads.py

#!/usr/bin/env python3

import psycopg2
from psycopg2 import sql

DATABASE = 'test_db'
USER = '{YOUR_USERNAME}'
PASSWORD = 'password'

conn = None
cur = None

try:
    conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
    cur = conn.cursor()
    cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/ok_upload.zip', DEFAULT)"))
    cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/bad_upload.zip', DEFAULT)"))
    cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/ok_upload.zip', DEFAULT)"))
    conn.commit()

except (Exception, psycopg2.Error) as err:
    print("Exception/Error:", err)

finally:
    # closing database conn.
    if cur:
        cur.close()
    if conn:
        conn.close()
    print("PostgreSQL conn is closed")

脚本 4:4_create_test_files.sh

#!/bin/bash

mkdir uploads
cd uploads
rm *
{ echo "metadata"; echo "Session data..."; } > 123_Session.csv
{ echo "data"; echo "Data 1..."; } > 123_Data.csv
zip 123_session.zip 123_Data.csv 123_Session.csv
zip ok_upload.zip 123_session.zip
rm 123_session.zip
zip 123_session.zip 123_Session.csv
zip bad_upload.zip 123_session.zip
rm 123*

脚本 5:5_insert_sessions.py

#!/usr/bin/env python3

import psycopg2
from psycopg2 import sql
import csv
from zipfile import ZipFile
import os
import shutil
import sys

MEDIA_ROOT_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/'
EXTRACTED_UPLOADS_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/extracted_uploads/'
EXTRACTED_SESSIONS_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/extracted_sessions/'
DATABASE = 'test_db'
USER = '{YOUR_USERNAME}'
PASSWORD = 'password'


def insert_csv(filepath, message, table, num_args, foreign_key):
    with open(filepath, 'r') as f:
        reader = csv.reader(f)
        next(reader)  # Skip the header row
        count = 0

        print(message)

        arguments_format = sql.SQL(', ').join(sql.Placeholder() * (num_args - 1))
        print('The arguments format is:', arguments_format.as_string(connection))

        for row in reader:
            row.append(foreign_key)
            cursor.execute(
                sql.SQL('INSERT INTO {} VALUES (DEFAULT, {})').format(sql.Identifier(table), arguments_format), row)
            count += 1

        print(count, 'record(s) will be inserted into %s table' % table)


def get_unprocessed_uploaded_zips():

    conn = None
    cur = None

    try:
        conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
        cur = conn.cursor()
        query = "SELECT * FROM reprex_upload WHERE processed=FALSE"
        cur.execute(query)
        res = cur.fetchall()

        # return true and res
        return True, res

    except (Exception, psycopg2.Error) as err:
        # return false and err message
        print("Exception/Error:", err)
        return False, None

    finally:
        # closing database conn.
        if cur:
            cur.close()
        if conn:
            conn.close()
        print("PostgreSQL conn is closed")


# COALESCE is used for the first insertion ever, where a NULL would be returned
def get_last_session_id():

    conn = None
    cur = None

    try:
        conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
        cur = conn.cursor()
        query = "SELECT COALESCE(MAX(id), 0) FROM reprex_session"
        cur.execute(query)
        result = cur.fetchone()

        # return true and results
        return True, result[0]

    except (Exception, psycopg2.Error) as err:
        # return false and err message
        print("Exception/Error:", err)
        return False, None

    finally:
        # closing database conn.
        if cur:
            cur.close()
        if conn:
            conn.close()
        print("PostgreSQL conn is closed")


# get all entries in Upload witch are unprocessed
query_success, results = get_unprocessed_uploaded_zips()

if query_success is False:
    sys.exit()

uploaded_zips = 0

for unprocessed_upload in results:

    uploaded_zips += 1
    print('\n\t' + '### UNPROCESSED UPLOAD ' + str(uploaded_zips) + ' ###\n')

    # The id field is the first one
    upload_zip_id = unprocessed_upload[0]

    # The zip_file field is the second one
    upload_zip_path = unprocessed_upload[1]
    print(upload_zip_path)

    # The filename will be the second part of the filepath
    upload_zip_name = upload_zip_path.split('/')[1]
    print(upload_zip_name)
    print(upload_zip_path)

    # The full filepath
    upload_zip_full_path = MEDIA_ROOT_DIR + upload_zip_path
    print(upload_zip_full_path)

    if upload_zip_full_path.endswith('.zip'):

        print('There is a new upload zip file: ' + upload_zip_full_path)

        # the folder name will be the file name minus the .zip extension
        upload_zip_folder_name = upload_zip_name.split('.')[0]
        upload_zip_folder_path = EXTRACTED_UPLOADS_DIR + upload_zip_folder_name

        # Create a ZipFile Object and load the received zip file in it
        with ZipFile(upload_zip_full_path, 'r') as zipObj:
            # Extract all the contents of zip file to the referred directory
            zipObj.extractall(upload_zip_folder_path)

        inserted_sessions = 0
        # Iterate over all session files inserting data in database
        for session_zip in os.scandir(upload_zip_folder_path):
            inserted_sessions += 1
            print('\n\t\t' + '### INSERTING SESSION ' + str(inserted_sessions) + ' ###\n')
            if session_zip.path.endswith('.zip') and session_zip.is_file():
                print('There is a new session zip file: ' + session_zip.name + '\n' + 'Located in: ' + session_zip.path)

                # the folder name will be the file name minus the .zip extension
                session_zip_folder_name = session_zip.name.split('.')[0]
                session_zip_folder_path = EXTRACTED_SESSIONS_DIR + session_zip_folder_name

                # Create a ZipFile Object and load the received zip file in it
                with ZipFile(session_zip, 'r') as zipObj:
                    # Extract all the contents of zip file to the referred directory
                    zipObj.extractall(session_zip_folder_path)

                session_file_path = session_zip_folder_path + '/' + \
                    session_zip_folder_name.replace('session', 'Session.csv')
                data_file_path = session_zip_folder_path + '/' + \
                    session_zip_folder_name.replace('session', 'Data.csv')

                # get the latest session id and increase it by one
                query_success, last_session_id = get_last_session_id()

                if query_success is False:
                    sys.exit()

                session_id = last_session_id + 1

                print('The session ID will be: ', session_id)

                connection = None
                cursor = None

                try:
                    # open a new database connection
                    connection = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
                    cursor = connection.cursor()

                    # First insert the Session file -> Link entry to Upload entry (upload_zip_id)
                    insert_csv(session_file_path, 'PROCESSING SESSION!\n', 'reprex_session', 3,
                               upload_zip_id)

                    # Then insert the Data file -> Link entry to Session entry (session_id)
                    insert_csv(data_file_path, 'PROCESSING DATA!\n', 'reprex_data', 3, session_id)

                    # modify the Upload entry to processed
                    update = "UPDATE reprex_upload SET processed=TRUE WHERE id=%s"
                    cursor.execute(update, (upload_zip_id,))

                    # make all changes or none
                    connection.commit()
                except (Exception, psycopg2.Error) as error:
                    # print error message
                    if connection:
                        print('ERROR:', error)
                finally:
                    # closing database connection.
                    if cursor:
                        cursor.close()
                    if connection:
                        connection.close()
                    print("PostgreSQL connection is closed")

                # Remove folder with extracted content - this erases the csv files
                try:
                    shutil.rmtree(session_zip_folder_path)
                except OSError as e:
                    print("Error: %s" % e.strerror)

        # Remove folder with extracted content - this erases the session zips
        try:
            shutil.rmtree(upload_zip_folder_path)
        except OSError as e:
            print("Error: %s " % e.strerror)

F - 按顺序运行 5 个脚本。您将验证“错误上传”是否会导致由于外键违规而无法插入第二个“良好上传”。可以使用脚本 3 插入更多上传,但不能插入更多会话。如果您手动删除“错误上传”,您将验证您仍然无法插入更多会话,因为外键违规。但是,如果您从脚本 1 开始重新创建数据库,您可以再次插入“好会话”。如果您从上传目录中删除“错误上传”,您可以插入任意数量的会话。但是在一个错误之后,总是有外键违规,好像约束没有被延迟一样。

我更改了这个问题的原标题,因为我现在发现这个问题无论如何都不是由 Django 引起的。我还将数据库模型更改为比我最初提出的更简单的模型,并更改了原始文本以反映这一点。我还删除了 Django 标签。

通过检查 ZIP 中是否存在正确的 CSV 可以轻松避免此示例中的特定错误,但在我的实际系统中,可能会发生其他错误。我需要的是解决约束中行为的明显变化的解决方案。

我知道我非常冗长,我感谢您的耐心和帮助。

标签: pythonpostgresqlubuntupsycopg2

解决方案


在同事的帮助下,我找到了解决问题的方法。

带有“bad_upload”的失败事务在内部增加了序列的 nextval。因此,在一个失败的事务之后,用于下一个会话的 id 将不是当前最大 id + 1,而是当前最大 + 2。

为了防止此类问题,获取下一个 Session id 的正确方法是执行 Session 插入:

cursor.execute(sql.SQL('INSERT INTO {} VALUES (DEFAULT, {}) RETURNING id').format(sql.Identifier(table), arguments_format), row)

然后获取要用于的 id:

session_id = cursor.fetchone()

然后在插入数据表时使用该会话 ID。

根据Postgres 文档:“为了避免阻塞从同一序列中获取数字的并发事务,nextval 操作永远不会回滚;也就是说,一旦获取了一个值,它就被认为已使用并且不会再次返回。这是即使周围的事务稍后中止,或者调用查询最终没有使用该值,也是如此。"

所以这最终成为 Postgres 的问题,甚至不是 Psycopg。非常感谢所有帮助解决这个问题的人。


推荐阅读