首页 > 解决方案 > 节点 Postgres COPY FROM 静默失败

问题描述

我正在尝试使用 PostgreSQL 的COPY FROMAPI 将潜在的数千条记录流式传输到数据库中,因为它们是在 node.js 代码中动态生成的。为此,我编写了这个通用包装函数:

function streamRows(client, { table, columns, data }) {
  return new Promise((resolve, reject) => {
    const sqlStream = client.query(
      copyFrom(`COPY ${ table } (${ columns.join(', ') }) FROM STDIN`));

    const rowStream = new Readable();

    rowStream.pipe(sqlStream)
      .on('finish', resolve)
      .on('error', reject);

    for (const row of data) {
      rowStream.push(`${ row.join('\t') }\n`);
    }

    rowStream.push('\\.\n');
    rowStream.push(null);
  });
}

我正在写入的数据库表如下所示:


CREATE TABLE devices (
  id SERIAL PRIMARY KEY,
  group_id INTEGER REFERENCES groups(id),
  serial_number CHAR(12) NOT NULL,
  status INTEGER NOT NULL
);

我这样称呼它:

function *genRows(id, devices) {
  let count = 0;

  for (const serial of devices) {
    yield [ id, serial, UNSTARTED ];
    count++;
    if (count % 10 === 0) log.info(`Streamed ${ count } rows...`);
  }

  log.info(`Streamed ${ count } rows.`);
}

await streamRows(client, {
  table: 'devices',
  columns: [ 'group_id', 'serial_number', 'status' ],
  data: genRows(id, devices),
});

我的生成器函数中生成每行数据的日志语句都按预期运行,并且输出表明它实际上总是运行生成器以完成,并流式传输我想要的所有数据行。不会抛出任何错误。但是如果我等待它完成,表格有时会添加 0 行——也就是说,看起来我将所有数据都发送到 Postgres,但实际上没有插入任何数据。我究竟做错了什么?

标签: node.jspostgresql

解决方案


我不知道这其中的哪些部分产生了差异,哪些部分是纯粹的风格,但是在玩了一堆来自网络上的不同示例之后,我设法拼凑出这个有效的功能:

function streamRows(client, { table, columns, data }) {
  return new Promise((resolve, reject) => {
    const iterator = data[Symbol.iterator]();
    const rs = new Readable();
    const ws = client.query(copyFrom(`COPY ${ table } (${ columns.join(', ') }) FROM STDIN`));

    rs._read = function() {
      const { value, done } = iterator.next();

      rs.push(done ? null : `${ value.join('\t') }\n`);
    };

    rs.on('error', reject);
    ws.on('error', reject);
    ws.on('end', resolve);
    rs.pipe(ws);
  });
}

推荐阅读