go - Go 例程后关闭冗余 sql.Rows 对象的推荐方法
问题描述
我正在使用 Go 例程将查询并行发送到 PostgreSQL 主节点和从节点。返回有效结果的第一个主机获胜。错误案例不在此问题的范围内。
调用者是唯一关心*sql.Rows
对象内容的人,所以我的函数故意不对这些内容进行任何操作。我使用缓冲通道从 Go 例程中检索返回对象,因此应该没有 Go 例程泄漏。垃圾收集应该照顾其余的。
有一个问题我没有正确解决:留在通道中的 Rows 对象永远不会关闭。当我从(只读)事务中调用此函数时,tx.Rollback()
为每个非关闭Rows
对象实例返回一个错误:"unexpected command tag SELECT"
.
从更高级别的对象调用此函数:
func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
rc := make(chan *sql.Rows, len(xs))
ec := make(chan error, len(xs))
for _, x := range xs {
go func(x executor) {
rows, err := x.QueryContext(ctx, query, args...)
switch { // Make sure only one of them is returned
case err != nil:
ec <- err
case rows != nil:
rc <- rows
}
}(x)
}
var me MultiError
for i := 0; i < len(xs); i++ {
select {
case err := <-ec:
me.append(err)
case rows := <-rc: // Return on the first success
return rows, nil
}
}
return nil, me.check()
}
执行器可以是*sql.DB
,*sql.Tx
或者任何符合接口的东西:
type executor interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
回滚逻辑:
func (mtx MultiTx) Rollback() error {
ec := make(chan error, len(mtx))
for _, tx := range mtx {
go func(tx *Tx) {
err := tx.Rollback()
ec <- err
}(tx)
}
var me MultiError
for i := 0; i < len(mtx); i++ {
if err := <-ec; err != nil {
me.append(err)
}
}
return me.check()
}
MultiTx
是多个节点上开放交易的集合。它是一个更高级别的对象,它调用multiQuery
“清理”未使用的行的最佳方法是什么?我正在考虑不做的选项:
cancel()
取消上下文:我相信它会不一致地工作,在调用时可能已经返回了多个查询- 创建一个延迟的 Go 例程,该例程继续耗尽通道并关闭行对象:如果 DB 节点响应缓慢,
Rollback()
仍会在之前调用rows.Close()
- 在 MultiTx 类型中的某处使用 a
sync.WaitGroup
,可能与 (2) 结合使用:如果其中一个节点无响应,这可能会导致 Rollback 挂起。另外,我不确定我将如何实现它。 - 忽略回滚错误:忽略错误听起来从来都不是一个好主意,它们的存在是有原因的。
解决此问题的推荐方法是什么?
编辑:
正如@Peter 所建议的那样,我尝试取消上下文,但这似乎也使查询中返回的所有行无效。在rows.Scan
我context canceled
在更高级别的调用者处遇到错误。
这是我到目前为止所做的:
func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rc := make(chan *sql.Rows, len(xs))
ec := make(chan error, len(xs))
for _, x := range xs {
go func(x executor) {
rows, err := x.QueryContext(ctx, query, args...)
switch { // Make sure only one of them is returned
case err != nil:
ec <- err
case rows != nil:
rc <- rows
cancel() // Cancel on success
}
}(x)
}
var (
me MultiError
rows *sql.Rows
)
for i := 0; i < len(xs); i++ {
select {
case err := <-ec:
me.append(err)
case r := <-rc:
if rows == nil { // Only use the first rows
rows = r
} else {
r.Close() // Cleanup remaining rows, if there are any
}
}
}
if rows != nil {
return rows, nil
}
return nil, me.check()
}
编辑2:
@Adrian 提到:
我们看不到实际使用这些的代码。
此代码由类型方法重用。首先是交易类型。这个问题的问题出现在上面的Rollback()
方法上。
// MultiTx holds a slice of open transactions to multiple nodes.
// All methods on this type run their sql.Tx variant in one Go routine per Node.
type MultiTx []*Tx
// QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines.
// The first non-error result is returned immediately
// and errors from the other Nodes will be ignored.
//
// If all nodes respond with the same error, that exact error is returned as-is.
// If there is a variety of errors, they will be embedded in a MultiError return.
//
// Implements boil.ContextExecutor.
func (mtx MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return multiQuery(ctx, mtx2Exec(mtx), query, args...)
}
然后是:
// MultiNode holds a slice of Nodes.
// All methods on this type run their sql.DB variant in one Go routine per Node.
type MultiNode []*Node
// QueryContext runs sql.DB.QueryContext on the Nodes in separate Go routines.
// The first non-error result is returned immediately
// and errors from the other Nodes will be ignored.
//
// If all nodes respond with the same error, that exact error is returned as-is.
// If there is a variety of errors, they will be embedded in a MultiError return.
//
// Implements boil.ContextExecutor.
func (mn MultiNode) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return multiQuery(ctx, nodes2Exec(mn), query, args...)
}
这些方法是函数周围的公共包装器multiQuery()
。现在我意识到只是将其发送*Rows
到缓冲通道中死,实际上是内存泄漏。在交易案例中,它变得很清楚,因为Rollback()
开始抱怨。但是在非事务变体中,通道内部永远不会被垃圾收集,因为驱动程序可能会在调用*Rows
之前持有对它的引用。rows.Close()
我已经编写了这个包,供 ORM sqlboiler使用。我的更高级别的逻辑将一个MultiTX
对象传递给 ORM。从那时起,我对返回的Rows
. 一种简单的方法是我的更高级别的代码取消了之前的上下文Rollback()
,但我不喜欢这样:
- 它提供了一个非直观的 API。这种(惯用的)方法会破坏:
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
tx, _ := db.BeginTx(ctx)
defer tx.Rollback()
- ORM 的接口还指定了常规的、非上下文感知的
Query()
变体,在我的包的情况下,它将针对context.Background()
.
我开始担心这会被设计破坏...无论如何,我将首先实现一个 Go 例程,该例程将耗尽通道并关闭*Rows
. 之后我会看看我是否可以实现一些不会影响返回的合理等待/取消机制*Rows
解决方案
我认为下面的函数将满足您的要求,但前提是当您完成结果时应该取消传入的上下文(否则context.WithCancel
会泄漏;我看不到在函数中取消它的方法将使返回的sql.Rows
) 无效。
请注意,我没有时间对此进行测试(需要设置数据库,实现接口等),因此代码中可能隐藏了一个错误(但我相信基本算法是合理的)
// queryResult holds the goroutine# and the result from that gorouting (need both so we can avoid cancelling the relevant context)
type queryResult struct {
no int
rows *sql.Rows
}
// multiQuery - Executes multiple queries and returns either the first to resutn a result or, if all fail, a multierror summarising the errors
// Important: This should be used for READ ONLY queries only (it is possible that more than one will complete)
// Note: The ctx passed in must be cancelled to avoid leaking a context (this routine cannot cancel the context used for the winning query)
func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
noOfQueries := len(xs)
rc := make(chan queryResult) // Channel for results; unbuffered because we only want one, and only one, result
ec := make(chan error) // errors get sent here - goroutines must send a result or 1 error
defer close(ec) // Ensure the error consolidation go routine will complete
// We need a way to cancel individual goroutines as we do not know which one will succeed
cancelFns := make([]context.CancelFunc, noOfQueries)
// All goroutines must terminate before we exit (otherwise the transaction maybe rolled back before they are cancelled leading to "unexpected command tag SELECT")
var wg sync.WaitGroup
wg.Add(noOfQueries)
for i, x := range xs {
var queryCtx context.Context
queryCtx, cancelFns[i] = context.WithCancel(ctx)
go func(ctx context.Context, queryNo int, x executor) {
defer wg.Done()
rows, err := x.QueryContext(ctx, query, args...)
if err != nil {
ec <- err // Error collection go routine guaranteed to run until all query goroutines complete
return
}
select {
case rc <- queryResult{queryNo, rows}:
return
case <-ctx.Done(): // If another query has already transmitted its results these should be thrown away
rows.Close() // not strictly required because closed context should tidy up
return
}
}(queryCtx, i, x)
}
// Start go routine that will send a MultiError to a channel if all queries fail
mec := make(chan MultiError)
go func() {
var me MultiError
errCount := 0
for err := range ec {
me.append(err)
errCount += 1
if errCount == noOfQueries {
mec <- me
return
}
}
}()
// Wait for one query to succeed or all queries to fail
select {
case me := <-mec:
for _, cancelFn := range cancelFns { // not strictly required so long as ctx is eventually cancelled
cancelFn()
}
wg.Wait()
return nil, me.check()
case result := <-rc:
for i, cancelFn := range cancelFns { // not strictly required so long as ctx is eventually cancelled
if i != result.no { // do not cancel the query that returned a result
cancelFn()
}
}
wg.Wait()
return result.rows, nil
}
}
推荐阅读
- postgresql - SQL 查询不适用于 PostgreSQL 命令
- android - 在领域中创建了两个表,但保存的数据仅在领域中的第一个表中
- python - 自定义不和谐状态不断收到此错误
- reactjs - React 中的解耦(带钩子)
- javascript - 如何从 Web 应用程序在点阵打印机中获得所需的输出?
- reactjs - 无法在 MongoDB (MERN) 中读取 _id 属性
- flutter - 参数类型“_ProfileImageState”不能分配给参数类型“TickerProvider”
- reactjs - 故事书 - 插件值应该以 /register 结尾,或者它应该是一个有效的预设
- javascript - 如何在 WP 中使用 javascript 重定向 *some* 页面?
- spring-boot - 如何使用 Spring Boot 查看 kafka 生产者指标