首页 > 解决方案 > 尝试使用通道获取 FIFO 处理管道

问题描述

我正在尝试使用一个简单的假示例在 Go 中编写一个顺序处理管道。它遍历一个假目录并运行一些转换。所以有一个字符串通道在它们之间共享。在一个函数写入数据后,第二个函数读取它。

在我看来,当我在 WalkFakeDirectory 函数前面放置一个 go 关键字时,它也只能按顺序工作和工作,如下面的代码示例(操场)所示。

如果有人能解释这是如何工作的,我将不胜感激?

package main 
import (
"fmt"
"strings"
"sync"
"time"
)

func main() {

done := make(chan int)
path := make(chan string)

defer close(done)

//var wg sync.WaitGroup - Not working too
//wg.Add(1)

fmt.Println("walking file path")

go WalkFakeDirectoy(done, path)

//wg.Add(1)

ConvertToUpperCase(path, done)

//wg.Wait()

fmt.Println("done!")
//time.Sleep(2000) // Not working 
}

func ConvertToUpperCase(files chan string, done chan int) {

   for file := range files {
      fmt.Println("converting data data", strings.ToUpper(file))
   }
}

func WalkFakeDirectoy(done chan int, path chan<- string) {

   func() {
    list := []string{"abc", "def", "fgh", "ijk", "mnn"}

    for _, file := range list {
        fmt.Println("getting data", file)
        path <- file
        time.Sleep(3000)
      }
   }()
}

标签: goconcurrencychannel

解决方案


这篇关于管道的 Go 博客文章应该有足够的信息来构建你自己的。它的要点是一个代码示例:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}
func sample_pipeline() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)
    out2 := sq(out)

    // Consume the output.
    for o := range out2 {
        fmt.Println(o)
    }
}

func main() {
    sample_pipeline()
}

sq是一个流水线阶段——它接受一个带有输入的通道并返回一个带有输出的通道(输入的平方值)。sample_pipeline建立一个两级管道并将具有两个值的生成器连接到它。

重要的是要注意完成是如何完成的——每个管道阶段都是一个执行管道阶段工作的 goroutine(等待来自输入管道的新数据,处理它,发送输出)。当每个阶段的输入管道完成(通道上的范围循环停止)时,它会关闭自己的通道。关闭通道是 Go 中发出“此通道已完成”信号的规范方式。


推荐阅读