dask - 工作人员在简单聚合期间崩溃
问题描述
我正在尝试聚合 4.5 亿行数据集上的各种列。当我使用 Dask 的内置聚合(如“min”、“max”、“std”、“mean”)时,会在此过程中不断使工作人员崩溃。
我正在使用的文件可以在这里找到:https : //www.kaggle.com/c/PLAsTiCC-2018/data 查找 test_set.csv
我有一个 google kubernetes 集群,它由 3 台 8 核机器组成,总共 22GB 的 RAM。
由于这些只是内置的聚合函数,所以我没有尝试太多。
它也没有使用那么多 RAM,它保持稳定在 6GB 左右,而且我没有看到任何表明内存不足错误的错误。
以下是我的基本代码和被驱逐工人的错误日志:
from dask.distributed import Client, progress
client = Client('google kubernetes cluster address')
test_df = dd.read_csv('gs://filepath/test_set.csv', blocksize=10000000)
def process_flux(df):
flux_ratio_sq = df.flux / df.flux_err
flux_by_flux_ratio_sq = (df.flux * flux_ratio_sq)
df_flux = dd.concat([df, flux_ratio_sq, flux_by_flux_ratio_sq], axis=1)
df_flux.columns = ['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected', 'flux_ratio_sq', 'flux_by_flux_ratio_sq']
return df_flux
aggs = {
'flux': ['min', 'max', 'mean', 'std'],
'detected': ['mean'],
'flux_ratio_sq': ['sum'],
'flux_by_flux_ratio_sq': ['sum'],
'mjd' : ['max', 'min'],
}
def featurize(df):
start_df = process_flux(df)
agg_df = start_df.groupby(['object_id']).agg(aggs)
return agg_df
overall_start = timer()
final_df = featurize(test_df).compute()
overall_end = timer()
错误日志:
distributed.core - INFO - Event loop was unresponsive in Worker for 74.42s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Worker for 3.30s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Worker for 3.75s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
其中一些会发生,然后:
distributed.core - INFO - Event loop was unresponsive in Worker for 65.16s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.worker - ERROR - Worker stream died during communication: tcp://hidden address
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 180, in read
n_frames = yield stream.read_bytes(8)
File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 441, in read_bytes
self._try_inline_read()
File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 911, in _try_inline_read
self._check_closed()
File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 1112, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
response = yield comm.read(deserializers=deserializers)
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
yielded = next(result)
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 201, in read
convert_stream_closed_error(self, e)
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 127, in convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
它运行得相当快,我只是希望在不让我的员工崩溃的情况下获得一致的性能。
谢谢!
解决方案
推荐阅读
- ios - 过渡到子 UINavigationController 时出现奇怪的导航栏动画
- php - 是否可以使用 QueryBuilder 管理返回数组的结构?
- authentication - 为什么我们更喜欢在标头中发送用户凭据、JWT 令牌?
- javascript - 如何阻止 form.submit() 将我重定向到另一个页面?
- java - 任务 ':app:transformResourcesWithMergeJavaResForDebug' 执行失败。发现多个文件具有独立于操作系统的路径
- c - ld 找不到 libjasper 库
- jenkins - 相同的脚本在 jenkins 脚本控制台上运行良好,但在管道上却不行
- c# - 脚手架错误:此操作将创建结构不正确的文档
- ios - iOS异步功能不按顺序
- java - 尝试提高使用 DynamicJasper 生成 xls 报告的性能。在这个方向上有什么建议吗?