go - 方法执行期间内存消耗高
问题描述
对于一个项目,我想为大约 5000 万行 CSV 中的每一行手动创建结构。为此,我逐行遍历文件并将每个结构附加到切片。这是简化的方法:
func readCSV(filePath string) DataFrame {
file, _ := os.Open(filePath)
defer file.Close()
var rows []Row
scanner := bufio.NewScanner(file)
scanner.Scan()
for scanner.Scan() {
parts := strings.Split(scanner.Text(), ",")
if len(parts) < 7 {
continue
}
column1, _ := strconv.Atoi(parts[0])
column2, _ := strconv.ParseFloat(parts[1], 32)
column3, _ := strconv.ParseFloat(parts[2], 32)
column4 := parts[3]
column5, _ := strconv.ParseFloat(parts[4], 32)
column6 := parts[5]
column7 := parts[6]
row := Row{
Column1: column1,
Column2: column2,
Column3: column3,
Column4: column4,
Column5: column5,
Column6: column6,
Column7: column7,
}
rows = append(rows, row)
}
return DataFrame{
Rows: rows,
}
}
生成的 DataFrame 有大约 3 GB 的内存。问题是在方法执行期间 RAM 消耗达到顶峰,而 Go 进程使用 15GB 以上的内存,这使得该函数无法用于我的目的。返回切片后,该进程的 RAM 消耗下降到预期的 3GB。
堆配置文件如下所示:
3.26GB 5.81GB (flat, cum) 100% of Total
. . 62: scanner := bufio.NewScanner(file)
. . 63: scanner.Scan()
. . 64: for scanner.Scan() {
. 2.55GB 65: parts := strings.Split(scanner.Text(), ",")
. . 66: if len(parts) < 7 {
. . 67: continue
. . 68: }
. . 69: column1, _ := strconv.Atoi(parts[0])
. . 70: column2, _ := strconv.ParseFloat(parts[1], 32)
. . 71: column3, _ := strconv.ParseFloat(parts[2], 32)
. . 72: column4 := parts[3]
. . 73: column5, _ := strconv.ParseFloat(parts[4], 32)
. . 74: column6 := parts[5]
. . 75: column7 := parts[6]
. . 76: row := Row{
. . 77: Column1: column1,
. . 78: Column2: column2,
. . 79: Column3: column3,
. . 80: Column4: column4,
. . 81: Column5: column5,
. . 82: Column6: column6,
. . 83: Column7: column7,
. . 84: }
3.26GB 3.26GB 85: rows = append(rows, row)
. . 86: }
. . 87:
. . 88: return DataFrame{
. . 89: Rows: rows,
我不知道高 RAM 消耗从何而来。我试图手动调用垃圾收集器但没有成功。谁能给我一个提示?
解决方案
rows
是 Row 结构的数组,而不是指针。浮点数和整数的每一行花费 32 个字节,加上字符串的长度。有 5000 万行可以变得非常大。更糟糕的是,append
它将增长rows
大约 1.5 倍,因此它最终会分配大量额外的内存,同时也会丢弃许多需要进行垃圾收集的较小版本。然后append(rows, row)
是副本,意味着更多的分配和释放。而且它必须等待垃圾收集来膨胀内存使用。
这可以通过存储引用来避免。这应该意味着更少的分配并且rows
显着减小。
var rows []*Row
...
rows = append(rows, &row)
然而,真正的问题是一次吞下所有东西。这是去!我们可以使用通道和goroutine在处理过程中一次同时读取一行。
CSV 看起来很棘手。Go 已经有一个 CSV 库encoding/csv,所以我们将使用它。
# A handy function to make ignoring errors a bit less laborious.
func IgnoreError(value interface{}, err error) interface{} {
return value
}
# Its more flexible to take an io.Reader.
# It returns a channel of individual rows.
func readCSV(input io.Reader) chan Row {
rows := make(chan Row)
go func() {
defer close(rows)
# Use encoding/csv.
# Let it reuse its backing array for each row.
# Ignore rows with the wrong number of columns.
reader := csv.NewReader(input)
reader.FieldsPerRecord = 7
reader.ReuseRecord = true
for {
parts, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
continue
}
# Send each row down the channel.
rows <- Row{
Column1: IgnoreError(strconv.Atoi(parts[0])).(int),
Column2: IgnoreError(strconv.ParseFloat(parts[1], 32)).(float64),
Column3: IgnoreError(strconv.ParseFloat(parts[2], 32)).(float64),
Column4: parts[3],
Column5: IgnoreError(strconv.ParseFloat(parts[4], 32)).(float64),
Column6: parts[5],
Column7: parts[6],
}
}
}();
return rows;
}
func main() {
file, err := os.Open("test.csv")
if err != nil {
log.Fatal(err)
}
rows := readCSV(file)
for row := range rows {
fmt.Println(row)
}
}
现在一次只加载一行。内存使用量应该是恒定的。
推荐阅读
- nginx - 具有集群 IP 服务和默认 nginx 的 kubernetes 入口控制器未按预期工作
- javascript - 如何在最近的标签中找到类?
- javascript - 不为空时禁用输入字段,如果为空则启用它不起作用
- sas - 在 sas 中查找新用户和重复用户
- haskell - 确定数据类型实例的构造函数
- ios - 服务配置为“nil”。使用此方法前需要配置`Info.plist`或设置`defaultServiceConfiguration`
- oracle-apex - 如何禁用 AJAX 错误窗口以进行约束?
- python - 通过中点找到坐标镜像
- r - 在一个向量中输入两次值
- c++ - 在运算符重载中按值传递错误