首页 > 解决方案 > Dask:SSHCluster 工作人员在哪里登录?(KilledWorker 例外)

问题描述

我为 dask 写了一个小启动脚本SSHClusterLocalCluster效果很好(见下文)。

但是当我让它运行时,一名工人与

KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-1-from-delayed-f0f78ffea007aeeccc7a9e04880d0195', 0)", <Worker 'tcp://192.168.56.11:34965', name: 0, memory: 0, processing: 1>)

例外,我想知道在哪里可以找到远程工作人员的崩溃日志/回溯?有没有办法在调度程序主机上集中收集它们?还是远程机器上有一些日志?

现有答案仅指出,它正在登录stdout- 但我也无法在那里找到任何日志..

from dask.distributed import Client, LocalCluster, SSHCluster
import time, datetime
import pandas as pd
import numpy as np
import os, sys
from collections import Counter

# Add Home Folder of CGi to path
os.environ["HOME"] = "/home/cgi/"
os.path.expanduser("~/.ssh/id_rsa")
#os.path.expanduser('/home/cgi/')
#os.path.expandvars('/home/cgi/')


def run_cluster(local=0, HOST = '10.8.0.1', SCHEDULER_PORT = 8711, DASHBOARD_PORT=8710,
                DASK_WORKER_PROCESSES = 16, NTHREADS=2, SILENCE_LOGS = 0):

    start_msg = "Starting a "
    if local: start_msg += "local"
    else: start_msg += "ssh"
    start_msg += " dask cluster. SCHEDULER_PORT=%s and DASHBOARD_PORT=%s." % ( SCHEDULER_PORT, DASHBOARD_PORT )
    print(start_msg)

    dashboard_address = ':%s' % DASHBOARD_PORT

    if local:
        cluster = LocalCluster(dashboard_address=dashboard_address, scheduler_port=SCHEDULER_PORT,
                               n_workers=DASK_WORKER_PROCESSES, host=HOST, silence_logs=SILENCE_LOGS)
    else:
        worker_hosts = [
            "localhost", "localhost", "localhost", "localhost",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11"
        ]
        print("Starting a DASK SSHCluster with (%s) workers on %s different hosts... "
              % len(worker_hosts), len(set(worker_hosts)))

        cluster = SSHCluster(
            worker_hosts,
            connect_options = {"known_hosts": None},
            worker_options = {"nthreads": NTHREADS},
            scheduler_options={"port": SCHEDULER_PORT, "dashboard_address": dashboard_address}
        )
        print("SSHCLUSTER>%s" % cluster)

    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, dashboard_address))

    #cluster.scale(3)
    client = Client(cluster)
    #print(cluster)
    print(client)
    print("Press Enter to quit ...")
    while (not time.sleep(5)):
        continue
        time.sleep(0.1)

if __name__ == '__main__':

    import argparse

    parser = argparse.ArgumentParser(description='Webserver which runs the dash/plotly dashboard(s). Name: BT_HISTORY')
    parser.add_argument('-l', '--local-cluster', help='1/0', required=False, default=1)
    parser.add_argument('-q', '--quiet', help='1/0', required=False, default=False)
    parser.add_argument('-dp', '--dashboard-port', help='port to run dashboard (default=%s)' % 8710, default=8710, required=False)
    parser.add_argument('-sp', '--scheduler-port', help='port to run scheduler (default=%s)' % 8711, default=8711, required=False)
    args = vars(parser.parse_args())
    print("args>%s" % args)
    LOCAL = int(args['local_cluster'])
    DASHBOARD_PORT = int(args['dashboard_port'])
    SCHEDULER_PORT = int(args['scheduler_port'])
    SILENCE_LOGS = int(args['quiet'])

    run_cluster(local=LOCAL, DASHBOARD_PORT=DASHBOARD_PORT, SCHEDULER_PORT=SCHEDULER_PORT, SILENCE_LOGS=SILENCE_LOGS)

标签: loggingdaskdask-distributed

解决方案


Dask Workers 将数据通过管道传输到标准输出是正确的。收集这些日志通常是其他系统的工作。

我相信从今天(2020-06-12)开始,dask-ssh命令行工具实际上使用的代码路径略有不同,可能会收集日志。你可以试试。

SSHCluster 也可能有一个logsorget_logs方法,但我怀疑它只适用于仍在运行的工作人员。


推荐阅读