logging - Dask:SSHCluster 工作人员在哪里登录?(KilledWorker 例外)
问题描述
我为 dask 写了一个小启动脚本SSHCluster
,LocalCluster
效果很好(见下文)。
但是当我让它运行时,一名工人与
KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-1-from-delayed-f0f78ffea007aeeccc7a9e04880d0195', 0)", <Worker 'tcp://192.168.56.11:34965', name: 0, memory: 0, processing: 1>)
例外,我想知道在哪里可以找到远程工作人员的崩溃日志/回溯?有没有办法在调度程序主机上集中收集它们?还是远程机器上有一些日志?
现有答案仅指出,它正在登录stdout
- 但我也无法在那里找到任何日志..
from dask.distributed import Client, LocalCluster, SSHCluster
import time, datetime
import pandas as pd
import numpy as np
import os, sys
from collections import Counter
# Add Home Folder of CGi to path
os.environ["HOME"] = "/home/cgi/"
os.path.expanduser("~/.ssh/id_rsa")
#os.path.expanduser('/home/cgi/')
#os.path.expandvars('/home/cgi/')
def run_cluster(local=0, HOST = '10.8.0.1', SCHEDULER_PORT = 8711, DASHBOARD_PORT=8710,
DASK_WORKER_PROCESSES = 16, NTHREADS=2, SILENCE_LOGS = 0):
start_msg = "Starting a "
if local: start_msg += "local"
else: start_msg += "ssh"
start_msg += " dask cluster. SCHEDULER_PORT=%s and DASHBOARD_PORT=%s." % ( SCHEDULER_PORT, DASHBOARD_PORT )
print(start_msg)
dashboard_address = ':%s' % DASHBOARD_PORT
if local:
cluster = LocalCluster(dashboard_address=dashboard_address, scheduler_port=SCHEDULER_PORT,
n_workers=DASK_WORKER_PROCESSES, host=HOST, silence_logs=SILENCE_LOGS)
else:
worker_hosts = [
"localhost", "localhost", "localhost", "localhost",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11"
]
print("Starting a DASK SSHCluster with (%s) workers on %s different hosts... "
% len(worker_hosts), len(set(worker_hosts)))
cluster = SSHCluster(
worker_hosts,
connect_options = {"known_hosts": None},
worker_options = {"nthreads": NTHREADS},
scheduler_options={"port": SCHEDULER_PORT, "dashboard_address": dashboard_address}
)
print("SSHCLUSTER>%s" % cluster)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, dashboard_address))
#cluster.scale(3)
client = Client(cluster)
#print(cluster)
print(client)
print("Press Enter to quit ...")
while (not time.sleep(5)):
continue
time.sleep(0.1)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Webserver which runs the dash/plotly dashboard(s). Name: BT_HISTORY')
parser.add_argument('-l', '--local-cluster', help='1/0', required=False, default=1)
parser.add_argument('-q', '--quiet', help='1/0', required=False, default=False)
parser.add_argument('-dp', '--dashboard-port', help='port to run dashboard (default=%s)' % 8710, default=8710, required=False)
parser.add_argument('-sp', '--scheduler-port', help='port to run scheduler (default=%s)' % 8711, default=8711, required=False)
args = vars(parser.parse_args())
print("args>%s" % args)
LOCAL = int(args['local_cluster'])
DASHBOARD_PORT = int(args['dashboard_port'])
SCHEDULER_PORT = int(args['scheduler_port'])
SILENCE_LOGS = int(args['quiet'])
run_cluster(local=LOCAL, DASHBOARD_PORT=DASHBOARD_PORT, SCHEDULER_PORT=SCHEDULER_PORT, SILENCE_LOGS=SILENCE_LOGS)
解决方案
Dask Workers 将数据通过管道传输到标准输出是正确的。收集这些日志通常是其他系统的工作。
我相信从今天(2020-06-12)开始,dask-ssh
命令行工具实际上使用的代码路径略有不同,可能会收集日志。你可以试试。
SSHCluster 也可能有一个logs
orget_logs
方法,但我怀疑它只适用于仍在运行的工作人员。
推荐阅读
- android - 如何在android中使用TextInputLayout
- javascript - JS Class:在类中创建变量而不被构造函数调用的最聪明的方法
- python - Python SQLAlchemy在给定时间戳之前获取最后一个条目(按时间戳)
- angular - Mat-Button Error: mat-form-field must contain a MatFormFieldControl
- python - 为 Scipy 的 cdist(或 pdist)使用具有自定义功能的附加 kwargs?
- css - Bootstrap 4 Img + 跨度对齐
- c++ - 为什么我的程序会泄漏内存?(在 C++ 中使用树)
- python - 通过重复日期 + uniqueID 在 Pandas 中合并 Intervalindex
- wordpress - 如何将媒体文件附加到 woocommerce 订单?
- python - 响应标头对 python 请求有编码错误