go - 基于通道的管道卡住了
问题描述
我正在尝试建立一个使用 Go 摄取数据的管道。这 3 个阶段是“批量下载”、“转换每条消息”和“将消息排入队列”。
对我来说似乎很自然的逻辑是为每个阶段创建 3 个函数,并将这些函数与无缓冲通道联系起来。
在我的代码中的某个地方,我没有正确实现通道,或者没有使用等待组?由于只有 1 条消息到达最后阶段并且程序似乎停止/阻塞。
func (c *worker) startWork() {
// channel for messages to be sent to the queue
chMessagesToEnqueue := make(chan types.Foo)
// channel for messages to be transformed
chMessagesToTransform := make(chan []types.UpstreamFooType)
// start the goroutines with the channels
go c.startTransformer(chMessagesToTransform, chMessagesToEnqueue)
go c.startEnqueuer(chMessagesToEnqueue)
go c.startDownloader(chMessagesToTransform)
}
func (c *worker) startDownloader(out chan []types.UpstreamFooType) {
// https://github.com/SebastiaanKlippert/go-soda
// uses a library here to fetch data from upstream APIs, but the gist is:
var wg sync.WaitGroup
for i := 0; i < c.workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
// i've cut out some meat out to make more concise
var results []types.UpstreamFooType
err = json.NewDecoder(resp.Body).Decode(&results)
out <- results
}
}()
}
wg.Wait()
}
func (c *worker) startTransformer(in <-chan []types.UpstreamFooType, out chan types.Foo) {
data := <-in
for _, record := range data {
msg := types.Foo{
name: record.fifa,
}
out <- msg
}
}
func (c *worker) startEnqueuer(in <-chan []types.Foo) {
data := <-in
c.logger.Infow("startEnqueuer", "data", data)
}
解决方案
推荐阅读
- android - 应用程序被杀死时Android中的FCM通知
- php - WordPress源日志系统的解决方案
- c# - Xamarin 表单上的 ActivityIndicator
- php - 获取错误记录已成功更新致命错误:未捕获错误:调用数组上的成员函数 fetch_assoc()
- python - 熊猫中的列到行?
- jq - 使用 jq 在某个键上组合两个对象数组
- reactjs - 在我的 web 服务调用完成之前,如何防止 componentDidMount 运行
- git - 使用 .config/directory 作为 git repo
- makefile - 处理从单个输入生成的多个文件
- c++ - 命名空间参数的理想架构设计