python - Python Prefect 上的 MySQL 连接
问题描述
我正在尝试创建一个接收 PyMySQL 连接实例作为输入的 Prefect 任务,例如:
@task
def connect_db():
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
return connection
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
@task
def get_df(rows) -> Any:
return pd.DataFrame(rows, dtype=str)
@task
def save_csv(df):
path = 'mypath'
df.to_csv(path, sep=';', index=False)
with Flow(FLOW_NAME) as f:
con = connect_db()
rows = query_db(con)
df = get_df(rows)
save_csv(df)
但是,当我尝试注册结果流时,它会引发“TypeError: cannot pickle 'socket' object”。通过 Prefect 的文档,我发现了内置的 MySQL 任务(https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但每次调用时它们都会打开和关闭连接. 有没有办法将以前打开的连接传递给 Prefect Task(或实现连接管理器之类的东西)?
解决方案
我试图复制你的例子,但它注册得很好。出现此类错误的最常见方式是,如果您在流程使用的全局命名空间中有客户端。Prefect 将在注册时尝试对其进行序列化。例如,如果您尝试注册以下代码片段,则会出错:
import pymysql
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
with Flow(FLOW_NAME) as f:
rows = query_db(connection)
此错误是因为connection
变量与流对象一起序列化。您可以通过将 Flow 存储为脚本来解决此问题。有关更多信息,请参阅此链接:
https://docs.prefect.io/core/idioms/script-based.html#using-script-based-flow-storage
这将避免 Flow 对象的序列化并在运行时创建该连接。
如果在运行时发生这种情况
如果您在运行时遇到此错误,您可以看到这有两个可能的原因。第一个是 Dask 序列化它,第二个来自 Prefect 检查点。
Dask 用于cloudpickle
通过网络将数据发送给工作人员。因此,如果您将 Prefect 与 DaskExecutor 一起使用,它将cloudpickle
用于发送任务以供执行。因此,任务输入和输出需要可序列化。在这种情况下,您应该实例化客户端并在任务中执行查询(就像您在当前 MySQL 任务实现中看到的那样)
如果您使用 LocalExecutor,则默认情况下会序列化任务输出,因为默认情况下会启用检查点。checkpoint=False
您可以在定义任务时通过做来切换。
如果您需要进一步的帮助,请随时在 prefect.io/slack 加入 Prefect Slack 频道。
推荐阅读
- c++ - Dependency Walker:处理时检测到错误
- python - 为 passlib.hash.scrypt 使用自定义 Base_64 字典
- javascript - React Native Redux:比较 prevProps 和 this.props 的最佳方法是什么?
- algorithm - 如何在模拟退火中选择一个状态的邻居?
- security - JBoss 上的 Picketlink 将 LogoutRequest 发送到不正确的端点
- mongodb - 查询行为与 MongoDB 中的解释计划不同
- pdf - PDF 间接对象是否存在仅引用另一个对象的语法?
- docker - docker schema registry 和 kafka rest 无法启动
- php - 允许用户角色访问特定插件 WORDPRESS
- java - 'System.out.println()' 调用的可抛出参数 'ex'