python - 没有中间累加器的异步get()?
问题描述
我有以下循环。我简化了代码。inner() 在类似的循环中解析同一个文件,当然没有 .remote() 调用
def outer(self,file):
rv = []
with open(file,'r') as f :
acc1, acc2 = [],[]
for i,line in enumerate(f) :
if i % 10 == 0 : print(f'> {i} ', end="\n")
if i > 25 : break
outeri,txt = line.split(':')
abc = ClassX.inner.remote(txt, file, int(outeri))
acc2.append(abc) #lst of obj-refs
acc1.append(int(outeri))
rv = [z for z in zip(acc1, ray.get([a for a in acc2])) ]
return rv
我想将数据异步收集到 rv 中,我在这里执行此操作,但没有中介“acc2”。
我有两个问题:
相反/除了收集数据之外,我想异步执行一些 SQL 代码,但随着结果的出现。
print() 进度不是逐步打印的,而是在最后一次打印。我必须把它移到“inner()”中
试图理解并行迭代器,但似乎很难/不可行如何在 readline 到 .remote() 调用之后压缩步骤
解决方案
编辑:根据澄清改变答案。
为了按照它们到达的顺序处理对象引用,您需要使用ray.wait
它们返回时获取对象引用,然后只调用ray.get
准备好的对象引用。
def outer(file):
outer_is = {}
unfinished_refs = []
with open(file, "r"):
for i, line in enumerate(f):
outeri, txt = line.split(":")
ref = ClassX.inner.remote(txt, file, int(outeri))
outer_is[ref] = int(outeri)
unfinished_refs.append(ref)
# This part will get and process tasks as they finish
while len(unfinished_refs) > 0:
finished, unfinished_refs = ray.wait(unfinished_refs)
outeri = outer_is[finished[0]]
result = ray.get(finished[0])
### Process the result here ###
推荐阅读
- angular - 从 ui-Bootstrap 迁移到 ng-Boostrap
- c# - 是否可以使用 EntityFramework Core 在 C# 中以某种方式生成 SQL 插入语句(作为纯字符串)?
- reactjs - reactjs粘性导航栏使用window.scrollTo滚动到div不起作用
- salesforce - 查找两个日期/时间之间的营业时间数(上午 8 点 - 下午 5 点)
- reactjs - 重定向后 React 被重新渲染并发送回缓存的登录 IE11
- git-bash - Git-Bash:如何更改选项中的字体之外的字体?
- java - 为什么我的列表视图没有出现在屏幕页面上
- android - 有互联网连接时调用 Web 服务的最佳方式
- jdbc - Google Script 中的 jdbc 问题 - “很抱歉,发生了服务器错误。”
- django-cms - Aldryn_newsblog 自定义发布日期格式