首页 > 解决方案 > 没有中间累加器的异步get()?

问题描述

我有以下循环。我简化了代码。inner() 在类似的循环中解析同一个文件,当然没有 .remote() 调用

def outer(self,file):
       rv = []
       with open(file,'r') as f :
            acc1, acc2 = [],[]
            for i,line in  enumerate(f) :
                 if i % 10 == 0 : print(f'> {i}  ', end="\n")
                 if i > 25 : break
                 outeri,txt = line.split(':')
                 abc = ClassX.inner.remote(txt, file, int(outeri))
                 acc2.append(abc) #lst of obj-refs
                 acc1.append(int(outeri)) 

            rv = [z for z in zip(acc1, ray.get([a for a in acc2])) ]
       return rv

我想将数据异步收集到 rv 中,我在这里执行此操作,但没有中介“acc2”。

我有两个问题:

  1. 相反/除了收集数据之外,我想异步执行一些 SQL 代码,但随着结果的出现。

  2. print() 进度不是逐步打印的,而是在最后一次打印。我必须把它移到“inner()”中

试图理解并行迭代器,但似乎很难/不可行如何在 readline 到 .remote() 调用之后压缩步骤

标签: pythonasynchronousray

解决方案


编辑:根据澄清改变答案。

为了按照它们到达的顺序处理对象引用,您需要使用ray.wait它们返回时获取对象引用,然后只调用ray.get准备好的对象引用。

def outer(file):
  outer_is = {}
  unfinished_refs = []
  with open(file, "r"):
    for i, line in enumerate(f):
      outeri, txt = line.split(":")
      ref = ClassX.inner.remote(txt, file, int(outeri))
      outer_is[ref] = int(outeri)
      unfinished_refs.append(ref)

  # This part will get and process tasks as they finish
  while len(unfinished_refs) > 0:
    finished, unfinished_refs = ray.wait(unfinished_refs)
    outeri = outer_is[finished[0]]
    result = ray.get(finished[0])
    ### Process the result here ###

推荐阅读