首页 > 解决方案 > Golang 同步 Goroutines

问题描述

我正在开发一个用于同时配置网络设备的 SSH 客户端应用程序,并且遇到了实现并发的问题。我的程序接收一组主机和一组配置命令以发送到每个主机。我正在使用 async.WaitGroup等待所有主机完成配置。这适用于小批量主机,但很快我的配置 goroutine 中的功能开始随机失败。如果我在失败的主机上重新运行程序,有些会成功,有些会失败。我必须重复这个过程,直到只剩下有实际错误的主机。它总是在身份验证方面失败,authentication failed: auth methods tried [none password]...或者来自的值sysDescr没有被添加到某些Devices 字段。就好像当有许多主机和 goroutines 运行时,它们开始提前返回或什么的。我真的不确定发生了什么。

这是我的代码示例:

package main

import (
    "fmt"
    "net"
    "os"
    "sync"
    "time"

    "golang.org/x/crypto/ssh"
)

func main() {
    // When there are many hosts, many calls to Dial and
    // sysDescr fail. If I rerun the program on the unsuccessful
    // hosts, nothing fails and the expected output is produced.
    var hosts []string
    cfg := &ssh.ClientConfig{
        User:            "user",
        Auth:            []ssh.AuthMethod{ssh.Password("pass")},
        HostKeyCallback: ssh.InsecureIgnoreHostKey(),
        Timeout:         10 * time.Second,
    }
    results := make(chan *result, len(hosts))

    var wg sync.WaitGroup
    wg.Add(len(hosts))
    for _, host := range hosts {
        go connect(host, cfg, results, &wg)
    }
    wg.Wait()
    close(results)

    for res := range results {
        if res.err != nil {
            fmt.Fprintln(os.Stderr, res.Err)
            continue
        }
        d := res.device
        fmt.Println(d.addr, d.hostname, d.vendor, d.os, d.model, d.version)
        d.Client.Close()
    }
}

// Device represents a network device.
type Device struct {
    *ssh.Client
    addr     string
    hostname string
    vendor   string
    os       string
    model    string
    version  string
}

// Dial establishes an ssh client connection to a remote host.
func Dial(host, port string, cfg *ssh.ClientConfig) (*Device, error) {
    // get host info in background, may take a second
    info := make(chan map[string]string)
    go func(c *Client) {
        info <- sysDescr(host)
        close(info)
    }(c)

    // establish ssh client connection to host
    client, err := ssh.Dial("tcp", net.JoinHostPort(host, addr), cfg)
    if err != nil {
        return nil, err
    }

    m := <-info
    d := &Device{
        Client: client,
        addr: m["addr"],
        hostname: m["hostname"],
        vendor: m["vendor"],
        os: m["os"],
        model: m["model"],
        version: m["version"],
    }
    return d, nil
}

// sysDescr attempts to gather information about a remote host.
func sysDescr(host string) map[string]string {
    // get host info
}

// result is the result of connecting to a device.
type result struct {
    device *Device
    err    error
}

// connect establishes an ssh client connection to a host.
func connect(host string, cfg *ssh.ClientConfig, results chan<- *result, wg *sync.WaitGroup) {
    defer wg.Done()
    device, err := Dial(host, "22", cfg)
    results <- &result{device, err}
}

难道我做错了什么?我可以限制生成的 goroutine 的数量,而不是为每个主机生成一个 goroutine 吗?

标签: gosshconcurrencyparallel-processinggoroutine

解决方案


是的!要回答您的第二个问题,go 中有许多限制并发的模式。其中最大的两个是:

我个人更喜欢工作池变体,因为 IMO 更好地帮助将业务逻辑与调度分开,但两者都是完全有效的,并且在许多项目中都存在。


推荐阅读