python - 为什么我的 Dask Futures 卡在“待处理”状态并且永远不会完成?
问题描述
我有一些长时间运行的代码(大约 5-10 分钟的处理),我正试图作为 Dask 运行Future
。这是一系列的几个离散步骤,我可以作为一个函数运行:
result : Future = client.submit(my_function, arg1, arg2)
或者我可以分成中间步骤:
# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)
如果我在本地运行它(例如,result = my_function(arg1, arg2)
),它就完成了。如果我将它提交给 Dask,我会立即得到Future
回复 - 正如预期的那样 - 但这项工作永远不会完成。此外,如果我将result.key
用作跟踪作业状态的一种方式,然后将未来重构为result = Future(key)
,它的状态始终为pending
。
我想首先让它按原样运行,以便我可以将我的处理卸载到我的 Dask 工作人员而不是处理请求的 API,然后我希望能够开始跨节点拆分工作,以便我可以改进表现。但为什么我的工作就这么消失了?查看我的 Dask 调度程序 Web 界面,作业似乎都没有出现。但我知道 Dask 正在工作,因为我可以从我的 Jupyter 笔记本向它提交代码。
我client.submit
从 Flask 服务器调用,并返回密钥以便以后使用。大致:
@app.route('/submit')
def submit():
# ...
future = client.submit(my_function, arg1, arg2)
return jsonify({"key": future.key})
@app.route('/status/<key>')
def status(key):
future = Future(key)
return jsonify({"status": future.status})
当我的应用程序部署到 Kubernetes 时,我的/submit
路由会返回一个 Future 密钥,但我的 Dask 状态页面没有显示任何处理任务。如果我在本地运行 Flask,我确实会看到一个任务出现,并且我的作业的输出确实会在预期的延迟后出现;但是,当我使用从 返回的 Future 键到达自己的/status/<key>
路径时/submit
,它始终显示状态为pending。
解决方案
如果指向某个任务的所有未来都消失了,那么 Dask 可以随意忘记该任务。这允许 Dask 清理工作,而不是让所有中间结果永远存在。
如果您想保留参考,那么您需要保留期货。这告诉 Dask 你仍然关心结果。您可以通过创建字典在您的烧瓶应用程序中本地执行此操作。
futures = {}
@app.route('/submit')
def submit():
# ...
future = client.submit(my_function, arg1, arg2)
futures[future.key] = future
return jsonify({"key": future.key})
@app.route('/status/<key>')
def status(key):
future = futures[key]
return jsonify({"status": future.status})
但是您还需要考虑何时可以清理和释放这些期货。通过这种方法,您将慢慢填满您的记忆。
推荐阅读
- ios - Pod 安装:忽略 ffi-1.13.1,因为它的扩展未构建。试试: gem pristine ffi --version 1.13.1
- python - 在 Flask 中为一个函数创建多个动态路由 | Python
- java - 传递流顺序的方法引用
- javascript - 需要帮助将节点内的孩子放入数组
- google-apps-script - 在 Google Script 中设置所选文本的背景颜色
- java - JUnit 5 测试类属性在每次测试之前为空
- flutter - Flutter Api 调用未处理异常
- python - UnboundLocalError:分配前引用的局部变量“嵌入”
- apache-kafka - Kafka Producer:发送消息后断开连接与保持连接打开
- vhdl - 函数内的 VHDL 计算