python - 如何使用dask bag和delay来加入2个映射功能?
问题描述
我有 2 个功能:find_components 和 processing_partition_component
import random
import dask.bag as db
def find_components(partition):
# it will return a list of components
return [x for x in range(1, random.randint(1,10))]
def processing_partition_component(part_comp):
print("processing %s" % part_comp)
partitions=['2','3','4']
我想在分区上计算 find_components(),然后获取每个分区的输出以生成用于 processing_partition_component() 的任务。并且计算不应等待所有 find_coponents() 完成。换句话说,应该在 processing_partition 之一完成后立即调用 processing_partition_component()。我已经尝试过了,但这不是我想要的:
db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]
您可以看到 processing_partition_component() 获取 find_components() 的整个输出,例如: [1, 2, 3, 4, 5] 作为输入。我想要的是任务应该在 find_components() 之后扇出,并且每个 processing_partition_component() 应该只需要 1 个元素,如 1、2、3、4 或 5。预期的打印输出是
processing 1
processing 2
processing 3
....
processing 1 # from another output of find_components
...
如果这是多线程的,则打印顺序将被混淆,因此处理 1 可以彼此相邻打印 3 次
我不知道如何使用 dask.bag 和 dask.delayed 来做到这一点。我正在使用最新的 dask 和 python3
谢谢,
解决方案
Dask bag 可以很好地处理发电机
def f(partition):
for x in partition:
yield x + 1
my_bag.map_partitions(f).map(print)
这将为每个元素添加一个,然后在移动到下一个元素之前打印它
推荐阅读
- java - 确定构造函数中的哪个参数用于设置类中的特定字段?
- avplayerview - 自定义 AVPlayerVIew:你知道默认使用哪个初始化器吗?
- r - 从字符串中删除初始(匹配)模式
- php - Laravel Eloquent - 忽略子句的地方
- ios - 发送推送以唤醒带有警报横幅(和声音)的应用程序的正确方法
- python - 当我明确定义它时,为什么我的程序返回“NoneType”对象不可下标错误?
- jquery - 具有动态值的 Jquery 选择器
- c# - 无法导航到 .Net Core 3.1 应用程序中的页面
- c++ - 模板类的非模板友元是否被实例化?
- php - 未定义索引 - 奇怪的行为