go - 使用 BLPOP 处理 Redis 队列会导致单元测试中出现竞争条件?
问题描述
我正在尝试实现一个先进先出的任务队列,如Go语言中 Redis 电子书的第 6.4.1 章所述。出于测试目的,我将一个CommandExecutor
接口传递给“worker”函数,如下所示:
package service
import (
"context"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const commandsQueue = "queuedCommands:"
var pool = redis.Pool{
MaxIdle: 50,
MaxActive: 1000,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", ":6379")
if err != nil {
logrus.WithError(err).Fatal("initialize Redis pool")
}
return conn, err
},
}
// CommandExecutor executes a command
type CommandExecutor interface {
Execute(string) error
}
func processQueue(ctx context.Context, done chan<- struct{}, executor CommandExecutor) error {
rc := pool.Get()
defer rc.Close()
for {
select {
case <-ctx.Done():
done <- struct{}{}
return nil
default:
// If the commands queue does not exist, BLPOP blocks until another client
// performs an LPUSH or RPUSH against it. The timeout argument of zero is
// used to block indefinitely.
reply, err := redis.Strings(rc.Do("BLPOP", commandsQueue, 0))
if err != nil {
logrus.WithError(err).Errorf("BLPOP %s %d", commandsQueue, 0)
return errors.Wrapf(err, "BLPOP %s %d", commandsQueue, 0)
}
if len(reply) < 2 {
logrus.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
return errors.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
}
// BLPOP returns a two-element multi-bulk with the first element being the
// name of the key where an element was popped and the second element
// being the value of the popped element (cf. https://redis.io/commands/blpop#return-value)
if err := executor.Execute(reply[1]); err != nil {
return errors.Wrapf(err, "execute scheduled command: %s", reply[0])
}
done <- struct{}{}
}
}
}
我制作了一个小型示例存储库https://github.com/kurtpeek/process-queue,其中包含此代码以及单元测试的尝试。对于单元测试,我有两个相同的测试(名称不同):
package service
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestProcessQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}
done := make(chan struct{})
go processQueue(ctx, done, executor)
rc := pool.Get()
defer rc.Close()
_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)
<-done
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
func TestProcessQueue2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}
done := make(chan struct{})
go processQueue(ctx, done, executor)
rc := pool.Get()
defer rc.Close()
_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)
<-done
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
其中CommandExecutorMock
是使用 生成的moq
。如果我单独运行每个测试,它们会通过:
~/g/s/g/k/process-queue> go test ./... -v -run TestProcessQueue2
=== RUN TestProcessQueue2
--- PASS: TestProcessQueue2 (0.00s)
PASS
ok github.com/kurtpeek/process-queue/service 0.243s
但是,如果我运行所有测试,第二个会超时:
~/g/s/g/k/process-queue>
go test ./... -v -timeout 10s
=== RUN TestProcessQueue
--- PASS: TestProcessQueue (0.00s)
=== RUN TestProcessQueue2
panic: test timed out after 10s
似乎在第二个测试运行时,在第一个测试中启动的 goroutine 仍在运行并BLPOP
从队列中读取命令,因此<-done
第二个测试中的行无限期阻塞。尽管调用cancel()
了第一个测试的父上下文。
我如何“隔离”这些测试,以便它们在一起运行时都通过?(我尝试将-p 1
标志传递给go test
但无济于事)。
解决方案
尽管在第一个测试的父上下文上调用了 cancel()。
在写入done
和调用之间有一些时间cancel()
,这意味着第一个测试可能(并且确实)进入第二for/select
次迭代而不是退出 on <-ctx.Done()
。更具体地说,测试代码在取消之前包含 2 个断言:
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
然后才defer cancel()
开始,这似乎为时已晚,无法取消第一个 go 例程的上下文。
如果您cancel()
在读取之前移动 call done
,则测试通过:
func TestProcessQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}
done := make(chan struct{})
go processQueue(ctx, done, executor)
rc := pool.Get()
defer rc.Close()
_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)
cancel() // note this change right here
<-done
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
推荐阅读
- c# - C# wpf 去掉顶部的边距
- javascript - 将对象从 JSON rest api 推送到空数组中,这在 React 中不起作用
- javascript - 在 vuejs 中截断已处理的 html
- image - 如何使用 AEM 自适应图像 Servlet 获取多个质量图像
- ember.js - Ember js构建抛出空合并错误
- math - 在两个数字之间增加整数以创建最平滑的线
- laravel - Laravel vue 有两种不同的布局
- d3.js - d3js 单向放大,双向平移
- bash - Automator - 在过滤列表中查找和移动最近的文件
- ios - 关于在 Swift 中显示 viewcontroller 时标签栏不可见的问题