首页 > 解决方案 > 使用 SQLAlchemy 在 PostgreSQL 中创建函数和触发器

问题描述

我使用 SQLAlchemy Engine 创建了一些函数和触发器,但我不想混合 Python 和 SQL,所以我为我的 SQL 语句创建了一个单独的文件,我读取了内容并将其传递给engine.execute(). 它不会抛出任何错误,但是函数不是在数据库中创建的,但是如果我通过 pgAdmin 运行相同的 SQL 文件,一切正常。

我的 SQL 文件:

DO $$
BEGIN
  IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'plpython3u') THEN
    CREATE EXTENSION plpython3u;
  END IF;
END;
$$;

DO $$
BEGIN
  IF NOT EXISTS (SELECT 1 FROM pg_proc WHERE proname = 'my_func') THEN
    CREATE FUNCTION public.my_func() RETURNS TRIGGER LANGUAGE 'plpython3u' NOT LEAKPROOF AS $BODY$
    -- definition
    $BODY$;

    GRANT EXECUTE ON FUNCTION my_func() TO public;
  END IF;
END;
$$;

DO $$
BEGIN
  IF NOT EXISTS (SELECT 1 FROM pg_proc WHERE proname = 'my_func2') THEN
    CREATE FUNCTION public.my_func2() RETURNS TRIGGER LANGUAGE 'plpython3u' NOT LEAKPROOF AS $BODY$
    -- definition
    $BODY$;

    GRANT EXECUTE ON FUNCTION my_func2() TO public;
  END IF;
END;
$$;

我按如下方式运行:

def execute_sql_file(engine, path):
    try:
        with open(path) as file:
            engine.execute(file.read())
    except ProgrammingError:
        raise MyCustomError
    except FileNotFoundError:
        raise MyCustomError

如果我在没有超级用户权限的情况下运行它,它会ProgrammingError按预期抛出 。在我的理解END;中提交了事务,所以这个代码真的运行了,这些函数应该可供公众使用,但是它们甚至没有被创建。欢迎任何想法,谢谢!

标签: pythonpostgresqlsqlalchemy

解决方案


我相信您可能混合了BEGINSQL 命令(Postgresql 扩展)和PL/pgSQL 块。SQL 命令DO执行一个匿名代码块,就好像它是一个没有参数并返回的匿名函数void。换句话说在

DO $$
BEGIN
    ...
END;
$$;

BEGIN/对END;表示代码块,而不是事务。值得注意的是,从 Postgresql 版本 11 开始,可以在一个DO中管理事务,因为它不是在事务块中执行的,但其命令是COMMITand ROLLBACK,而不是关键字END

那么问题是您的更改没有提交,尽管您的命令显然已执行 - 如错误所证明的那样,如果没有以合适的权限运行。此问题是由SQLAlchemy 自动提交功能的工作方式引起的。简而言之,它会检查您的语句/命令并尝试确定它是数据更改操作还是 DDL 语句。这适用于INSERT, DELETE,等基本操作UPDATE,但并不完美。事实上,它不可能总是正确地确定语句是否更改数据;例如SELECT my_mutating_procedure()是这样的声明。因此,如果进行更复杂的操作,它需要一些帮助。text()一种方法是通过将 SQL 字符串包装在一个构造中来指示自动提交机制它应该提交,并且使用execution_options()

engine.execute(text("SELECT my_mutating_procedure()").
               execution_options(autocommit=True))

也可以使用以下DDL结构显式指示 SQLAlchemy 该命令是文字 DDL 语句:

from sqlalchemy.schema import DDL

def execute_sql_file(engine, path):
    try:
        with open(path) as file:
            stmt = file.read()

        # Not strictly DDL, but a series of DO commands that execute DDL
        ddl_stmt = DDL(stmt)
        engine.execute(ddl_stmt)

    except ProgrammingError:
        raise MyCustomError

    except FileNotFoundError:
        raise MyCustomError

至于为什么它与 pgAdmin 一起工作,如果没有引发错误,它可能默认提交。


推荐阅读