首页 > 解决方案 > 表传输在 Airflow 中的第一次迭代后停止

问题描述

我使用以下代码通过 Airflow (MWAA) 将小表从数据库 A 传输到数据库 B:

def move_data(sql_file_name, target_tbl_name, target_schema_name):
    select_stmt = ""
    dir_path = os.path.dirname(os.path.realpath(__file__))
    with open(dir_path +'/' + sql_file_name, 'r') as file:
        select_stmt = file.read().replace('%', '%%')

    src = PostgresHook(postgres_conn_id="A")
    src_engine = src.get_sqlalchemy_engine().connect()
    dest = PostgresHook(postgres_conn_id="B")
    dest_engine = dest.get_sqlalchemy_engine().connect()

    for chunk in pd.read_sql(select_stmt, src_engine, chunksize=30000):
        print('rows =  {0}, columns = {1}'.format(chunk.shape[0], chunk.shape[1]))
        try:
            chunk.to_sql(name=target_tbl_name, con=dest_engine,
                    schema=target_schema_name, chunksize=30000,
                    if_exists='replace', index=False, method='multi')
        except Exception as e:
            print(e)
    dest_engine.execute('commit;')
    dest_engine.close()

然而,代码只循环一次并且不传输任何记录,只传输目标数据库中表的模式。该表有大约 50000 条记录,但调整块大小无济于事。日志中没有错误。

该代码在 Jupyter notebook 中执行时运行良好,无需使用 Airflow Hooks。

有什么建议可能是什么问题?

标签: pythonsqlsqlalchemyairflowmwaa

解决方案


对于登陆这里的任何人:问题在于 chunksize=30000 如果 chunksize 太大,Worker 将退出并且函数将静默失败。减小块大小有帮助。


推荐阅读