首页 > 解决方案 > Dask - 是否可以在每个具有自定义功能的工作人员中使用所有线程?

问题描述

就我而言,我在 S3 中有几个文件和一个自定义函数,可以读取每个文件并使用所有线程对其进行处理。为了简化示例,我只生成一个数据框df,并假设我的函数是tsfresh.extract_features使用多处理的。

生成数据

import pandas as pd
from tsfresh import extract_features
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \
load_robot_execution_failures
download_robot_execution_failures()
ts, y = load_robot_execution_failures()
df = []
for i in range(5):
    tts = ts.copy()
    tts["id"] += 88 * i
    df.append(tts)
    
df = pd.concat(df, ignore_index=True)

功能

def fun(df, n_jobs):
    extracted_features = extract_features(df,
                                      column_id="id",
                                      column_sort="time",
                                      n_jobs=n_jobs)

import dask
from dask.distributed import Client, progress
from dask import compute, delayed
from dask_cloudprovider import FargateCluster

my_vpc = # your vpc
my_subnets = # your subnets

cpu = 2 
ram = 4
cluster = FargateCluster(n_workers=1,
                         image='rpanai/feats-worker:2020-08-24',
                         vpc=my_vpc,
                         subnets=my_subnets,
                         worker_cpu=int(cpu * 1024),
                         worker_mem=int(ram * 1024),
                         cloudwatch_logs_group="my_log_group",
                         task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
                         scheduler_timeout='20 minutes'
                        )


cluster.adapt(minimum=1,
              maximum=4)
client = Client(cluster)
client

使用所有工作线程 (FAIL)

to_process = [delayed(fun)(df, cpu) for i in range(10)]
out = compute(to_process)
AssertionError: daemonic processes are not allowed to have children

仅使用一个线程(确定)

在这种情况下,它工作正常,但我在浪费资源。

to_process = [delayed(fun)(df, 0) for i in range(10)]
out = compute(to_process)

问题

我知道对于这个特定的功能,我最终可以使用多线程和其他一些技巧编写一个自定义分发器,但我想分发一个工作,让每个工作人员都可以利用所有资源而不必太担心。

更新

该函数只是一个示例,实际上它在实际特征提取之前和保存到S3.

def fun(filename, bucket_name, filename_out, n_jobs):
    #
    df pd.read_parquet(f"s3://{bucket_name}/{filename}")
    # do some cleaning
    extracted_features = extract_features(df,
                                      column_id="id",
                                      column_sort="time",
                                      n_jobs=n_jobs)
   extract_features.to_parquet(f"s3://{bucket_name}/{filename_out}")

标签: pythondaskpython-multithreading

解决方案


我可以帮助回答您的具体问题tsfresh,但如果tsfresh只是一个简单的玩具示例,可能不是您想要的。

对于tsfresh,您通常不会将多处理tsfresh和 dask 混合在一起,而是让 dask 完成所有处理。这意味着,您从单个开始dask.DataFrame(在您的测试用例中,您可以将 pandas 数据帧转换为 dask 数据帧 - 对于您的读取用例,您可以直接从 docu 读取),然后在 dask 数据帧中分配特征提取S3 特征提取的好处是,它在每个时间序列上独立工作。因此我们可以为每个时间序列生成一个作业)。

当前版本的tsfresh(0.16.0) 有一个小的辅助函数可以为您执行此操作:请参见此处。在下一个版本中,甚至可以extract_features直接在 dask 数据帧上运行。

我不确定这是否有助于解决您更普遍的问题。在我看来,你(在大多数情况下)不想混合 dask 的分布函数和“本地”多核计算,而只是让 dask 处理所有事情。因为如果您在一个 dask 集群上,您甚至可能不知道每台机器上有多少个内核(或者每个作业可能只有一个内核)。

这意味着如果您的工作可以分发 N 次并且每个人将启动 M 个子工作,那么您只需将“N x M”工作交给 dask 并让它找出其余的(包括数据局部性)。


推荐阅读