首页 > 解决方案 > Golang 工作池导致数据库损坏

问题描述

我一直在 Golang 中创建一个模拟器类型的系统,我想使用 InfluxDB 2.0 的 Golang 客户端存储来自该系统的数据。但是,想看看我是否可以通过 goroutine 提高数据库写入的速度。

由于为每个数据点设置一个 goroutine 会导致 InfluxDB2 不堪重负,我决定实现一个工作池系统​​来限制使用的 goroutine 的数量。然而,当数据通过工作池写入数据库时​​,它总是会被破坏,出现奇怪的峰值和以前不存在的值变化(而不是直线,它会不稳定。)

我通过一个名为 Simulate 的函数来执行此操作,该函数接受一个时间值(用于时间序列数据库)、一个实体结构指针(包含要模拟的所有数据)和两个单独的客户端,每个客户端都编写不同的集合数据的。

maxNumGoroutines := flag.Int("maxNumGoroutines", 10, "Max num of goroutines")
flag.Parse()

concurrentGoroutines := make(chan struct{}, *maxNumGoroutines) // Semaphore 
var wg sync.WaitGroup // Wait for goroutines to finish

timeLength := setTimeLength(inputVar) // Example of setting length of time

simObjects:= &entities.Objects // objects are propagated as *Object, meaning no return value
// Additional entities also exist inside the entities struct

for timeIterator := 0; timeIterator <= timeLength; timeIterator++ { 
    for _, objectID:= range entities.Objects.GetObjectIDs() { // all objects within the simulation
        wg.Add(1)
        go func(entityState *entities.EntityHolder, chosenObj string, timeGo time.Time) { 
            defer wg.Done()
            concurrentGoroutines <- struct{}{} // Set goroutine as busy
            calc.Propagate(entityState.Objects.GetObject(chosenObj), timeGo) // Edit the value at pointer addr

            calc.Metrics(entityState, chosenObj, timeGo, metricDB) // seperate further calcs and write

            PassInflux(entityState.Objects.GetObject(chosenObj), clientDB, timeGo) // send propagated data 
            <-concurrentGoroutines // Free up goroutine
        }(entities, objectID, timeIterator) // pass in as variables, otherwise operating on changing pointers
    }
}
wg.Wait()
log.Println("Simulation complete.")

Metrics() 写入是非阻塞的,这意味着它可以异步写入。但是,我刷新Propagate() 数据以确保在传播所有对象之后发送数据;如果没有这个,就会尝试一次写入太多对象(即使数据库使用 5000 点的批量大小)。

我在这里错过了什么吗?有没有一种名义上的方法来设置一个带有指针的工作池?

标签: databasepointersgopoolgoroutine

解决方案


事实证明,我错误地使用了 InfluxDB 2 客户端——我应该将写入 API 传递给传播函数而不是整个客户端。这样做意味着写入速度显着加快;因此,不需要 goroutine。


推荐阅读