python - 表传输在 Airflow 中的第一次迭代后停止
问题描述
我使用以下代码通过 Airflow (MWAA) 将小表从数据库 A 传输到数据库 B:
def move_data(sql_file_name, target_tbl_name, target_schema_name):
select_stmt = ""
dir_path = os.path.dirname(os.path.realpath(__file__))
with open(dir_path +'/' + sql_file_name, 'r') as file:
select_stmt = file.read().replace('%', '%%')
src = PostgresHook(postgres_conn_id="A")
src_engine = src.get_sqlalchemy_engine().connect()
dest = PostgresHook(postgres_conn_id="B")
dest_engine = dest.get_sqlalchemy_engine().connect()
for chunk in pd.read_sql(select_stmt, src_engine, chunksize=30000):
print('rows = {0}, columns = {1}'.format(chunk.shape[0], chunk.shape[1]))
try:
chunk.to_sql(name=target_tbl_name, con=dest_engine,
schema=target_schema_name, chunksize=30000,
if_exists='replace', index=False, method='multi')
except Exception as e:
print(e)
dest_engine.execute('commit;')
dest_engine.close()
然而,代码只循环一次并且不传输任何记录,只传输目标数据库中表的模式。该表有大约 50000 条记录,但调整块大小无济于事。日志中没有错误。
该代码在 Jupyter notebook 中执行时运行良好,无需使用 Airflow Hooks。
有什么建议可能是什么问题?
解决方案
对于登陆这里的任何人:问题在于 chunksize=30000 如果 chunksize 太大,Worker 将退出并且函数将静默失败。减小块大小有帮助。
推荐阅读
- mysql - mysql:每个子查询结果的返回列
- c# - 从 c# 转换为 C++/CLI VS2015
- python - 如何从不同的函数中获取字典(键和值)?
- algorithm - 光线的聚类算法
- scala - 如何使用 spark 获取 hdfs 目录的大小
- c# - 如何在 VS 扩展中触发 Build/Rebuild/Clean
- office-js - 在 Visual Studio 中将 Office UI Fabric 反应组件添加到 Office 加载项 Web 项目
- python - 将两个排序列表合并为一个而不递归
- html - 单击 md-menu、mdDialog 按钮时,顶部导航向上移动
- django - 在 django 的 CreateView 中创建新的父模型