首页 > 解决方案 > 对事件调用异步函数时,JavaScript 堆内存不足

问题描述

我正在编写一个 Node.js 应用程序,它从非常繁忙的 kafka 主题(每天数百万条消息)中读取 avro 编码的消息,将它们解压缩、解码并将它们写入 PostgreSQL 数据库。问题是程序运行了一两分钟,然后在 JavaScript heap out of memory 错误上崩溃。我已经调查了问题可能出在哪里,并且似乎是在我将解码的消息写入数据库时​​发生的。如果我只是记录解码的消息,它工作正常。或者,如果我将消息写入数据库而不对其进行解码,则它可以正常工作。但是当我在 decoder.on('data') 事件上调用 writeRow() 函数时,它很快就会耗尽内存。我没有正确处理解码器发出的事件吗?

我正在使用以下模块:

以下是相关代码:

消费者:

const run = async () => {
  await consumer.connect();
  await topics.forEach(topic => {
    consumer.subscribe({ topic: topic, fromBeginning: false });
  });

  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      try {
        const buf = await Buffer.from(message.value, "binary");
        const decoder = new avro.streams.BlockDecoder();
        await decoder.on("data", async msg => {
          console.log(`Message received: ${msg}`);
          await writeRow(topic, new Date().toISOString(), msg);
        });
        await decoder.end(buf);
      } catch (err) {
        console.log(err);
      }
    }
  });
};

writeRow() 函数:

const writeRow = async (topic, timestamp, data) => {
  const client = await pool.connect();
  try {
    const res = await client.query(
      `INSERT INTO ${topic.replace(
        /\./g,
        "_"
      )}(topic, timestamp, data) VALUES('${topic}', '${timestamp}', '${JSON.stringify(
        data
      )}')`
    );
    console.log(`${res.rowCount} row written to database.`);
  } catch (err) {
    console.log(err);
  } finally {
    await client.release();
  }
};

标签: node.jspostgresqleventsapache-kafkaasync-await

解决方案


推荐阅读