python - 如何将数据库引擎/连接传递到多处理池?不能腌制 _thread._local 对象
问题描述
我想task
有效地在函数上执行 sql。但我不知道怎么做!我目前正在调用一个哑函数,task
它创建一个连接with snowflake.connector.connect(some_creds_here) as con:
,为每个查询创建一个连接。
我尝试将引擎对象发送到,task
但它具有相同的响应。我从一个用于调试的大模块中创建了这个基本结构。
def get_connection():
engine = create_engine(
'snowflake://{user}:{password}@{account}/'.format(
user="user",
password="password",
account="us-east-1",
)
)
try:
connection = engine.connect()
return connection, engine
finally:
pass
def task(con):
con = con.connect()
print(con.execute('select current_version()').fetchone())
def main():
connection, engine = get_connection()
iterable = [(engine) for x in range(10)]
with Pool() as p:
for _ in p.imap_unordered(task, iterable):
...
con.close()
engine.dispose()
if __name__ == "__main__":
main()
这是错误响应:
Traceback (most recent call last):
File "service\ECR\file_conversion_worker\mock.py", line 26, in main
for _ in p.imap_unordered(task, iterable):
File "\AppData\Continuum\lib\multiprocessing\pool.py", line 748, in next
raise value
File "\AppData\Continuum\lib\multiprocessing\pool.py", line 431, in _handle_tasks
put(task)
File "\AppData\Continuum\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "\AppData\Continuum\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread._local objects
编辑:将线程池更正为多处理池
解决方案
推荐阅读
- arrays - Avro.avroexception:根据记录模式写入需要通用记录,但找到了 avro.generic.genericrecord
- api - OAuth2 Symfony Api 邮递员没有经过警卫身份验证器
- python - 来自数据框的 X 轴数据
- java - Jackson 抽象类型 - 杂项类型
- javascript - 如何使用 Abort Controller 中止从 useEffect 内部启动但由用户操作触发的 API?
- android-emulator - Android模拟器:横向物理键盘上的箭头键
- facebook - 我想知道如何订阅 webhook facebook 的活动帖子
- arrays - Julia:将分类数组转换为数值数组的完美方法是什么?
- javascript - 通知/提醒用户该元素已在使用中?
- xcode12 - Xcode 12 运行问题