node.js - 对事件调用异步函数时,JavaScript 堆内存不足
问题描述
我正在编写一个 Node.js 应用程序,它从非常繁忙的 kafka 主题(每天数百万条消息)中读取 avro 编码的消息,将它们解压缩、解码并将它们写入 PostgreSQL 数据库。问题是程序运行了一两分钟,然后在 JavaScript heap out of memory 错误上崩溃。我已经调查了问题可能出在哪里,并且似乎是在我将解码的消息写入数据库时发生的。如果我只是记录解码的消息,它工作正常。或者,如果我将消息写入数据库而不对其进行解码,则它可以正常工作。但是当我在 decoder.on('data') 事件上调用 writeRow() 函数时,它很快就会耗尽内存。我没有正确处理解码器发出的事件吗?
我正在使用以下模块:
- kafkajs, kafkajs-snappy, avsc 用于消费、解压和解码来自 kafka 主题的消息
- node-postgres 用于将消息写入数据库
以下是相关代码:
消费者:
const run = async () => {
await consumer.connect();
await topics.forEach(topic => {
consumer.subscribe({ topic: topic, fromBeginning: false });
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
try {
const buf = await Buffer.from(message.value, "binary");
const decoder = new avro.streams.BlockDecoder();
await decoder.on("data", async msg => {
console.log(`Message received: ${msg}`);
await writeRow(topic, new Date().toISOString(), msg);
});
await decoder.end(buf);
} catch (err) {
console.log(err);
}
}
});
};
writeRow() 函数:
const writeRow = async (topic, timestamp, data) => {
const client = await pool.connect();
try {
const res = await client.query(
`INSERT INTO ${topic.replace(
/\./g,
"_"
)}(topic, timestamp, data) VALUES('${topic}', '${timestamp}', '${JSON.stringify(
data
)}')`
);
console.log(`${res.rowCount} row written to database.`);
} catch (err) {
console.log(err);
} finally {
await client.release();
}
};
解决方案
推荐阅读
- android - How to build NativeScript Vue app compliant with Google Play 64-bit requirement?
- java - 每次调用如何为所有受影响的方法建立一个 JAX_RS 连接?
- docker - 外部网络上忽略 Docker 端口重定向
- xamarin.forms - xamarin 表单中的自动建议条目
- html - div内容溢出
- pandas - 使用 pg8000 读取红移数据时如何避免字节引用的列标题
- asp.net-mvc - SignalR in Angular 7 and ASP.Net MVC
- python - 如何在条形图中添加数据标签、设置 Y 轴格式和添加图例
- php - 如何在php中将字符串转换为ascii字节数组
- php - 如何仅使用ajax一次将产品包添加到购物车中