首页 > 解决方案 > 使用 Node.js/Sequelize 进行批量插入时 PostgreSQL 崩溃

问题描述

一个使用 Sequelize.js ORM 的 Node.js 应用程序正在对在 Mac OSX 主机系统上的 Docker 容器内运行的 PostgreSQL 11.2 服务器执行批量插入。每个大容量插入通常由大约 1000-4000 行组成,大容量插入并发为 30,因此任何时候最多有 30 个活动插入操作。

const bulkInsert = async (payload) => {
    try {
        await sequelizeModelInstance.bulkCreate(payload);
    } catch (e) {
        console.log(e);
    }
}

pLimit = require('p-limit')(30);

(function() => {
    const promises = data.map(d => pLimit(() => bulkInsert(d))) // pLimit() controls Promise concurrency
    const result = await Promise.all(promises)
})();

一段时间后,PostgreSQL 服务器将开始给出错误Connection terminated unexpectedly,然后是the database system is in recovery mode.

在重复了几次并检查了我的日志之后,似乎这个错误通常发生在执行一批 30 个批量插入时,其中几个批量插入每个包含超过 100,000 行。例如,当尝试进行 3 个 190000、650000 和 150000 行的批量插入以及 27 个 1000-4000 行的插入时,会发生一个特定的崩溃。

系统内存未满,CPU负载正常,磁盘空间充足。

问题:在这种情况下期望 PostgreSQL 崩溃是否正常?如果是这样,我们可以调整 PostgreSQL 设置以允许更大的批量插入吗?如果这是因为大批量插入,Sequelize.js 是否具有为我们拆分批量插入的功能?

在 Docker 容器中的 PostgreSQL 11.2、TimescaleDB 1.5.1、节点 v12.6.0、sequelize 5.21.3、Mac Catalina 10.15.2 上运行

PostgreSQL 在问题发生后立即记录

2020-01-18 00:58:26.094 UTC [1] LOG:  server process (PID 199) was terminated by signal 9
2020-01-18 00:58:26.094 UTC [1] DETAIL:  Failed process was running: INSERT INTO "foo" ("id","opId","unix","side","price","amount","b","s","serverTimestamp") VALUES (89880,'5007564','1579219200961','front','0.0000784','35','undefined','undefined','2020-01-17 00:00:01.038 +00:00'),.........
2020-01-18 00:58:26.108 UTC [1] LOG:  terminating any other active server processes
2020-01-18 00:58:26.110 UTC [220] WARNING:  terminating connection because of crash of another server process
2020-01-18 00:58:26.110 UTC [220] DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
2020-01-18 00:58:26.110 UTC [220] HINT:  In a moment you should be able to reconnect to the database and repeat your command.
2020-01-18 00:58:26.148 UTC [214] WARNING:  terminating connection because of crash of another server process
2020-01-18 00:58:26.148 UTC [214] DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
2020-01-18 00:58:26.148 UTC [214] HINT:  In a moment you should be able to reconnect to the database and repeat your command.
2020-01-18 00:58:26.149 UTC [203] WARNING:  terminating connection because of crash of another server process
2020-01-18 00:58:26.149 UTC [203] DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.

...

2020-01-18 00:58:30.098 UTC [1] LOG:  all server processes terminated; reinitializing
2020-01-18 00:58:30.240 UTC [223] FATAL:  the database system is in recovery mode
2020-01-18 00:58:30.241 UTC [222] LOG:  database system was interrupted; last known up at 2020-01-18 00:50:13 UTC
2020-01-18 00:58:30.864 UTC [224] FATAL:  the database system is in recovery mode
2020-01-18 00:58:31.604 UTC [225] FATAL:  the database system is in recovery mode
2020-01-18 00:58:32.297 UTC [226] FATAL:  the database system is in recovery mode
2020-01-18 00:58:32.894 UTC [227] FATAL:  the database system is in recovery mode
2020-01-18 00:58:33.394 UTC [228] FATAL:  the database system is in recovery mode
2020-01-18 01:00:55.911 UTC [222] LOG:  database system was not properly shut down; automatic recovery in progress
2020-01-18 01:00:56.856 UTC [222] LOG:  redo starts at 0/197C610
2020-01-18 01:01:55.662 UTC [229] FATAL:  the database system is in recovery mode

标签: node.jspostgresqldockersequelize.jstimescaledb

解决方案


我在运行迁移时遇到了类似的问题,但该解决方案可以应用于这个问题。

这个想法是将您的有效负载拼接成可管理的块。就我而言,一次 100 条记录似乎是可以管理的。

const payload = require("./seeds/big-mama.json"); //around 715.000 records

module.exports = {
    up: (queryInterface) => {
        const records = payload.map(function (record) {
            record.createdAt = new Date();
            record.updatedAt = new Date();
            return record;
        });

        let lastQuery;
        while (records.length > 0) {
            lastQuery = queryInterface.bulkInsert(
                "Products",
                records.splice(0, 100),
                {}
            );
        }

        return lastQuery;
    },

    down: (queryInterface) => {
        return queryInterface.bulkDelete("Products", null, {});
    }
};


推荐阅读