python - Dask - 是否可以在每个具有自定义功能的工作人员中使用所有线程?
问题描述
就我而言,我在 S3 中有几个文件和一个自定义函数,可以读取每个文件并使用所有线程对其进行处理。为了简化示例,我只生成一个数据框df
,并假设我的函数是tsfresh.extract_features
使用多处理的。
生成数据
import pandas as pd
from tsfresh import extract_features
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \
load_robot_execution_failures
download_robot_execution_failures()
ts, y = load_robot_execution_failures()
df = []
for i in range(5):
tts = ts.copy()
tts["id"] += 88 * i
df.append(tts)
df = pd.concat(df, ignore_index=True)
功能
def fun(df, n_jobs):
extracted_features = extract_features(df,
column_id="id",
column_sort="time",
n_jobs=n_jobs)
簇
import dask
from dask.distributed import Client, progress
from dask import compute, delayed
from dask_cloudprovider import FargateCluster
my_vpc = # your vpc
my_subnets = # your subnets
cpu = 2
ram = 4
cluster = FargateCluster(n_workers=1,
image='rpanai/feats-worker:2020-08-24',
vpc=my_vpc,
subnets=my_subnets,
worker_cpu=int(cpu * 1024),
worker_mem=int(ram * 1024),
cloudwatch_logs_group="my_log_group",
task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
scheduler_timeout='20 minutes'
)
cluster.adapt(minimum=1,
maximum=4)
client = Client(cluster)
client
使用所有工作线程 (FAIL)
to_process = [delayed(fun)(df, cpu) for i in range(10)]
out = compute(to_process)
AssertionError: daemonic processes are not allowed to have children
仅使用一个线程(确定)
在这种情况下,它工作正常,但我在浪费资源。
to_process = [delayed(fun)(df, 0) for i in range(10)]
out = compute(to_process)
问题
我知道对于这个特定的功能,我最终可以使用多线程和其他一些技巧编写一个自定义分发器,但我想分发一个工作,让每个工作人员都可以利用所有资源而不必太担心。
更新
该函数只是一个示例,实际上它在实际特征提取之前和保存到S3
.
def fun(filename, bucket_name, filename_out, n_jobs):
#
df pd.read_parquet(f"s3://{bucket_name}/{filename}")
# do some cleaning
extracted_features = extract_features(df,
column_id="id",
column_sort="time",
n_jobs=n_jobs)
extract_features.to_parquet(f"s3://{bucket_name}/{filename_out}")
解决方案
我可以帮助回答您的具体问题tsfresh
,但如果tsfresh
只是一个简单的玩具示例,可能不是您想要的。
对于tsfresh
,您通常不会将多处理tsfresh
和 dask 混合在一起,而是让 dask 完成所有处理。这意味着,您从单个开始dask.DataFrame
(在您的测试用例中,您可以将 pandas 数据帧转换为 dask 数据帧 - 对于您的读取用例,您可以直接从 docu 读取),然后在 dask 数据帧中分配特征提取S3
(特征提取的好处是,它在每个时间序列上独立工作。因此我们可以为每个时间序列生成一个作业)。
当前版本的tsfresh
(0.16.0) 有一个小的辅助函数可以为您执行此操作:请参见此处。在下一个版本中,甚至可以extract_features
直接在 dask 数据帧上运行。
我不确定这是否有助于解决您更普遍的问题。在我看来,你(在大多数情况下)不想混合 dask 的分布函数和“本地”多核计算,而只是让 dask 处理所有事情。因为如果您在一个 dask 集群上,您甚至可能不知道每台机器上有多少个内核(或者每个作业可能只有一个内核)。
这意味着如果您的工作可以分发 N 次并且每个人将启动 M 个子工作,那么您只需将“N x M”工作交给 dask 并让它找出其余的(包括数据局部性)。
推荐阅读
- r - 无法使用 stringr::string_extract 在正则表达式中转义 $
- arrays - 如何在 Go 中清除和重用数组(不是切片)?
- mongodb - MongoDB - 我可以创建一个索引来隔离文档键中的值以加快搜索速度吗?
- python - 如何正确验证 REST API 查询参数?
- python - Python 语法:带有输入元组的 __getitem__ 和 __setitem__?
- java - 比较 3 个数组并将 Null 错误判断为某个值
- postsharp - PostSharp 不会在最新的 Visual Studio (16.8.1) 中构建
- java - 如何优化这个for循环?
- firebase - 将自定义域从一个项目更改为另一个项目
- java - 如何在 Json 中仅解包 id 参数