首页 > 解决方案 > 如何使用dask bag和delay来加入2个映射功能?

问题描述

我有 2 个功能:find_components 和 processing_partition_component

import random
import dask.bag as db

def find_components(partition):
  # it will return a list of components
  return [x for x in range(1, random.randint(1,10))]

def processing_partition_component(part_comp):
  print("processing %s" % part_comp)

partitions=['2','3','4']

我想在分区上计算 find_components(),然后获取每个分区的输出以生成用于 processing_partition_component() 的任务。并且计算不应等待所有 find_coponents() 完成。换句话说,应该在 processing_partition 之一完成后立即调用 processing_partition_component()。我已经尝试过了,但这不是我想要的:

db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]

您可以看到 processing_partition_component() 获取 find_components() 的整个输出,例如: [1, 2, 3, 4, 5] 作为输入。我想要的是任务应该在 find_components() 之后扇出,并且每个 processing_partition_component() 应该只需要 1 个元素,如 1、2、3、4 或 5。预期的打印输出是

processing 1
processing 2
processing 3
....
processing 1  # from another output of find_components
...

如果这是多线程的,则打印顺序将被混淆,因此处理 1 可以彼此相邻打印 3 次

我不知道如何使用 dask.bag 和 dask.delayed 来做到这一点。我正在使用最新的 dask 和 python3

谢谢,

标签: pythonpython-3.xdask

解决方案


Dask bag 可以很好地处理发电机

def f(partition):
    for x in partition:
        yield x + 1

my_bag.map_partitions(f).map(print)

这将为每个元素添加一个,然后在移动到下一个元素之前打印它


推荐阅读