javascript - 带有 csv-write-stream 的“错误:结束后写入”
问题描述
我确信这从根本上是对流如何工作的误解,但我正在把头撞到墙上。
我有一些 json 格式的传感器数据,我想使用 csv-write-stream 包附加到 csv 文件中。数据作为发布请求发送到节点服务器,目的是将其附加到 csv 文件中。它第一次很好地将一行写入 csv 文件,但是如果我尝试发送另一个发布请求,则会收到“错误:结束后写入”错误。
function write_csv(obj) {
writer.pipe(fs.createWriteStream('tides.csv', { flags: 'a' }));
writer.write(obj);
writer.end();
};
如果我注释掉“writer.end()”它工作正常,但这最终不会引发内存错误吗?如果是这样,附加到 csv 文件并避免此错误的正确方法是什么?
编辑:这是整个 server.js 文件
const express = require('express');
const bodyParser = require('body-parser');
const path = require('path');
const exphbs = require('express-handlebars');
const fs = require('fs');
const csvWriter = require('csv-write-stream');
const writer = csvWriter({ sendHeaders: false });
const app = express();
app.set('views', path.join(__dirname, 'views'));
app.engine('handlebars', exphbs({ defaultLayout: 'main' }));
app.set('view engine', 'handlebars');
app.set('port', (process.env.PORT || 3000));
app.use(express.static(path.join(__dirname, 'public')));
app.use(bodyParser.urlencoded({ extended: false }))
app.use(bodyParser.json())
app.get('/', function (req, res) {
res.render('home')
})
app.post('/test', function (req, res, next) {
// console.log("post received");
distance = req.body.distance;
let result = test(distance);
let result_str = JSON.stringify(result);
res.end(result_str)
});
function write_csv(obj) {
writer.pipe(fs.createWriteStream('out.csv', { flags: 'a' }));
writer.write(obj);
writer.end();
};
function test(dist) {
let d = new Date();
let YYYY = d.getFullYear();
let MM = d.getMonth();
let DD = d.getDate();
let HH = d.getHours();
let mm = d.getMinutes();
let ss = d.getSeconds();
let date = YYYY + ':' + (MM + 1) + ':' + DD + ':' + HH + ':' + mm + ':' + ss;
let time_distance = { 'time': date, 'distance': distance };
console.log(time_distance);
write_csv(time_distance);
return time_distance;
};
app.listen(app.get('port'), function () {
console.log('Sever started on port ' + app.get('port'));
})
解决方案
在没有看到完整代码的情况下,我可以想象您正在write_csv
多次调用,因为您正在尝试将多个对象写入该文件。
问题是你第一次打电话write_csv
时你结束了writer
,这就是为什么你第二次打电话你会得到:
Error [ERR_STREAM_WRITE_AFTER_END]: write after end
function write_csv(obj) {
writer.pipe(fs.createWriteStream('out.csv', { flags: 'a' }))
writer.write()
writer.end();
}
write_csv({ hello: 'world', foo: 'bar', baz: 'taco'});
// When you call it again, writer.end(); is already closed
// The following line will trigger the error
write_csv({ hello: 'world', foo: 'bar', baz: 'taco'});
相反,您应该做的是仅在完成写入后才关闭作家。
const writer = csvWriter(); // Create new writer
// open file
writer.pipe(fs.createWriteStream('out.csv', { flags: 'a' }));
for(const obj of objects) // Write as many times as you wish
writer.write(obj);
writer.end(); // I'm done writing.
现在您遇到的问题是,如果您尝试执行多个.writes
,您将达到内存限制,因为您没有正确处理背压。
我建议阅读以下问题:
为了解决这个问题,您需要等待drain
事件被发出。
csvWriter
这是一个处理背压的包装器。
const fs = require('fs');
const csvWriter = require('csv-write-stream');
class Writer {
constructor(file) {
this.writer = csvWriter();
this.writer.pipe(fs.createWriteStream(file, { flags: 'a' }));
}
write(obj) {
// if .write returns false we have to wait until `drain` is emitted
if(!this.writer.write(obj))
return new Promise(resolve => this.writer.once('drain', resolve))
return true;
}
end() {
// Wrap it in a promise if you wish to wait for the callback.
this.writer.end();
}
}
(async() => {
const writer = new Writer('out.csv');
for(let i = 0; i < 1e8; i++) {
const res = writer.write({ hello: 'world', foo: 'bar', baz: 'taco' });
if(res instanceof Promise) {
// You can remove this if, and leave just: await writer.write...
// but the code will be slower
await res; // This will wait for the stream to emit the drain event
}
}
writer.end();
})();
更新:现在使用实际代码,上面的答案仍然有效,但是因为您在收到请求时正在写入文件。您可以选择是否打开文件一次,并在每个请求上写入,在服务器关闭时(或您选择时)关闭它。或者只是打开文件,写入文件,然后在每次请求时关闭它,
对于前者,您应该使用上面的答案,对于后者,您需要做的就是每次调用时创建一个新的编写器,write_csv
而不是拥有一个全局编写器。
function write_csv(obj) {
// Create a new writer every time
const writer = csvWriter({ sendHeaders: false });
writer.pipe(fs.createWriteStream('out.csv', { flags: 'a' }));
writer.write(obj);
writer.end();
};
推荐阅读
- git - Semantic-release:如何将已经发布到 artifactory 的主版本降级?
- javascript - 如何为从 Web Speech API (window.speechSynthesis.speak()) 调用生成的语音音频实现捕获音频/下载功能?
- javascript - 使用 axios 不会重新渲染我的 React Native 组件
- python - 在 Python 中迭代模块
- c++ - 为什么具有静态存储持续时间的同一个内联变量在包含在由 vs2017 编译的两个翻译单元中时会构造和破坏两次
- vue.js - 单击事件作为 Vue.js 中的道具
- php - Chrome 忽略 Content-Disposition 文件名
- r - 如何按因子计算变量?
- python - 熊猫中的if语句-python
- excel - 在 VBA 范围比较中查找行值