首页 > 解决方案 > 如何使用通道流式传输命令输出?

问题描述

我试图在命令输出发生时捕获它,以便我可以将其作为缓冲数据刷新到 HTTP 客户端,但我没有正确使用通道并且不知道如何修复它。

这基本上就是我实施它的方式。从未收到输出通道,我没有得到命令输出。这一定是相当明显的事情,但我没有看到。

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "os/exec"
)

func main() {
    output := make(chan []byte, 1024)
    done := make(chan struct{})

    cmd := exec.Command("echo", "hello")

    go execute(cmd, output, done)

    for {
        select {
        case data := <-output:
            fmt.Println("Got data")
            fmt.Println(string(data))
            //Flush to http.ResponseWriter
        case <-done:
            fmt.Println("Done")
            return
        }
    }
}

func execute(cmd *exec.Cmd, output chan []byte, done chan struct{}) {
    defer func() {
        done <- struct{}{}
    }()

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        fmt.Println(err)
        output <- []byte(fmt.Sprintf("Error getting stdout pipe: %v", err))
        return
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        fmt.Println(err)
        output <- []byte(fmt.Sprintf("Error getting stderr pipe: %v", err))
        return
    }

    scanner := bufio.NewScanner(io.MultiReader(stdout, stderr))

    err = cmd.Start()
    if err != nil {
        fmt.Println(err)
        output <- []byte(fmt.Sprintf("Error executing: %v", err))
        return
    }

    go func() {
        for scanner.Scan() {
            fmt.Println(string(scanner.Bytes()))
            output <- scanner.Bytes()
            fmt.Println("Sent data")
        }
    }()

    err = cmd.Wait()
    if err != nil {
        fmt.Println(err)
        output <- []byte(fmt.Sprintf("Error waiting for the script to complete: %v", err))
        return
    }
}

编辑:

经过一番思考,这里是更新的代码。这可以按我的预期工作,但仍然不确定我是否正确执行此操作。我删除了输出通道上的完成通道和范围。这会接收通过扫描器在执行中发送的输出中的所有内容,并且不会阻塞,因为命令完成时输出关闭。这不是在 Go Playground 中打印“hello”,但我在我的项目中得到了预期的行为。

仍然会感谢反馈。

package main

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "os/exec"
)

func main() {
    output := make(chan []byte)

    cmd := exec.Command("echo", "hello")

    go execute(cmd, output)

    for data := range output {
        fmt.Println(string(data))
    }
}

func execute(cmd *exec.Cmd, output chan []byte) {
    defer close(output)

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        fmt.Println(err)
        output <- []byte(fmt.Sprintf("Error getting stdout pipe: %v", err))
        return
    }
    cmd.Stderr = cmd.Stdout

    scanner := bufio.NewScanner(stdout)

    done := make(chan struct{})

    err = cmd.Start()
    if err != nil {
        output <- []byte(fmt.Sprintf("Error executing: %v", err))
        return
    }

    go func() {
        for scanner.Scan() {
            output <- scanner.Bytes()
        }
        done <- struct{}{}
    }()

    <-done

    err = cmd.Wait()
    if err != nil {
        fmt.Println(err)
        output <- []byte(fmt.Sprintf("Error waiting for the script to complete: %v", err))
    }
}

标签: goconcurrency

解决方案


推荐阅读