首页 > 解决方案 > 方法执行期间内存消耗高

问题描述

对于一个项目,我想为大约 5000 万行 CSV 中的每一行手动创建结构。为此,我逐行遍历文件并将每个结构附加到切片。这是简化的方法:

func readCSV(filePath string) DataFrame {
    file, _ := os.Open(filePath)
    defer file.Close()
    var rows []Row
    scanner := bufio.NewScanner(file)
    scanner.Scan()
    for scanner.Scan() {
        parts := strings.Split(scanner.Text(), ",")
        if len(parts) < 7 {
            continue
        }
        column1, _ := strconv.Atoi(parts[0])
        column2, _ := strconv.ParseFloat(parts[1], 32)
        column3, _ := strconv.ParseFloat(parts[2], 32)
        column4 := parts[3]
        column5, _ := strconv.ParseFloat(parts[4], 32)
        column6 := parts[5]
        column7 := parts[6]
        row := Row{
            Column1: column1,
            Column2: column2,
            Column3: column3,
            Column4: column4,
            Column5: column5,
            Column6: column6,
            Column7: column7,
        }
        rows = append(rows, row)
    }
    return DataFrame{
        Rows: rows,
    }
}

生成的 DataFrame 有大约 3 GB 的内存。问题是在方法执行期间 RAM 消耗达到顶峰,而 Go 进程使用 15GB 以上的内存,这使得该函数无法用于我的目的。返回切片后,该进程的 RAM 消耗下降到预期的 3GB。

堆配置文件如下所示:

    3.26GB     5.81GB (flat, cum)   100% of Total
         .          .     62:   scanner := bufio.NewScanner(file)
         .          .     63:   scanner.Scan()
         .          .     64:   for scanner.Scan() {
         .     2.55GB     65:           parts := strings.Split(scanner.Text(), ",")
         .          .     66:           if len(parts) < 7 {
         .          .     67:                   continue
         .          .     68:           }
         .          .     69:           column1, _ := strconv.Atoi(parts[0])
         .          .     70:           column2, _ := strconv.ParseFloat(parts[1], 32)
         .          .     71:           column3, _ := strconv.ParseFloat(parts[2], 32)
         .          .     72:           column4 := parts[3]
         .          .     73:           column5, _ := strconv.ParseFloat(parts[4], 32)
         .          .     74:           column6 := parts[5]
         .          .     75:           column7 := parts[6]
         .          .     76:           row := Row{
         .          .     77:                   Column1: column1,
         .          .     78:                   Column2: column2,
         .          .     79:                   Column3: column3,
         .          .     80:                   Column4: column4,
         .          .     81:                   Column5: column5,
         .          .     82:                   Column6: column6,
         .          .     83:                   Column7: column7,
         .          .     84:           }
    3.26GB     3.26GB     85:           rows = append(rows, row)
         .          .     86:   }
         .          .     87:
         .          .     88:   return DataFrame{
         .          .     89:           Rows: rows,

我不知道高 RAM 消耗从何而来。我试图手动调用垃圾收集器但没有成功。谁能给我一个提示?

标签: gomemorymemory-managementram

解决方案


rows是 Row 结构的数组,而不是指针。浮点数和整数的每一行花费 32 个字节,加上字符串的长度。有 5000 万行可以变得非常大。更糟糕的是,append它将增长rows大约 1.5 倍,因此它最终会分配大量额外的内存,同时也会丢弃许多需要进行垃圾收集的较小版本。然后append(rows, row)是副本,意味着更多的分配和释放。而且它必须等待垃圾收集来膨胀内存使用。

这可以通过存储引用来避免。这应该意味着更少的分配并且rows显着减小。

var rows []*Row
...
rows = append(rows, &row)

然而,真正的问题是一次吞下所有东西。这是去!我们可以使用通道goroutine在处理过程中一次同时读取一行。

CSV 看起来很棘手。Go 已经有一个 CSV 库encoding/csv,所以我们将使用它。

# A handy function to make ignoring errors a bit less laborious.
func IgnoreError(value interface{}, err error) interface{} {
    return value
}

# Its more flexible to take an io.Reader.
# It returns a channel of individual rows.
func readCSV(input io.Reader) chan Row {
    rows := make(chan Row)
    go func() {
        defer close(rows)

        # Use encoding/csv.
        # Let it reuse its backing array for each row.
        # Ignore rows with the wrong number of columns.
        reader := csv.NewReader(input)
        reader.FieldsPerRecord = 7
        reader.ReuseRecord = true

        for {
            parts, err := reader.Read()

            if err == io.EOF {
                break
            }
            if err != nil {
                continue
            }

            # Send each row down the channel.
            rows <- Row{
                Column1: IgnoreError(strconv.Atoi(parts[0])).(int),
                Column2: IgnoreError(strconv.ParseFloat(parts[1], 32)).(float64),
                Column3: IgnoreError(strconv.ParseFloat(parts[2], 32)).(float64),
                Column4: parts[3],
                Column5: IgnoreError(strconv.ParseFloat(parts[4], 32)).(float64),
                Column6: parts[5],
                Column7: parts[6],
            }
        }
    }();
    
    return rows;
}

func main() {
    file, err := os.Open("test.csv")
    if err != nil {
        log.Fatal(err)
    }
    
    rows := readCSV(file)
    for row := range rows {
        fmt.Println(row)
    }
}

现在一次只加载一行。内存使用量应该是恒定的。


推荐阅读