javascript - 如何从 Node.js 流中释放我的数据
问题描述
我已经使用 Java Script API 有一段时间了,但这是我第一次尝试从永远不会发出的活动流中采样'done'
。我的目标是每小时从流中获取一定数量的样本。流连接和传输大量信息,但我无法将返回的数据转换为可以对其进行进一步处理的格式(就像我在数据科学工作流程中所熟悉的那样)。
感觉就像我这几天一直在盯着文档,并注意到最简单的示例将可读流通过管道传输到服务器上的文件中。这对我的应用程序来说似乎效率低下。(必须将其写入文件,只需再次将其读入以对其进行更多处理,然后再将其发送到浏览器以通过 fetch API 进行渲染或将其发送到项目的 mongoDB 以进行长期存储和深度分析。我很确定有一种方法可以将 JSON 设置为const
orvar
而我只是不熟悉它。
如何将我的数据放入saved
Java Script 变量中?为了能够继续操作和处理返回的 JSON,我需要更改或添加什么到我的代码中?
const needle = require('needle');
const token = process.env.BEARER_TOKEN;
const streamURL = 'https://api.twitter.com/2/tweets/sample/stream';
function streamConnect() {
const options = {
timeout: 2000,
};
const stream = needle.get(
streamURL,
{
headers: {
Authorization: `Bearer ${token}`,
},
},
options
);
stream
.on('data', (data) => {
try {
const json = JSON.parse(data);
// console.log(json);
} catch (e) {
// Keep alive signal received. Do nothing.
}
})
.on('error', (error) => {
if (error.code === 'ETIMEDOUT') {
stream.emit('timeout');
}
});
return stream;
}
function getTweetSample() {
const s = streamConnect();
const chunks = [];
s.on('readable', () => {
let chunk;
while (null !== (chunk = s.read())) {
chunks.push(chunk);
}
});
setInterval(() => {
s.destroy();
}, 3000);
return chunks;
}
const saved = API.getTweetSample();
console.log('saved: ', saved);
// Above returns
// "saved: []"
// Expecting
// "saved:
{
{
data: {
id: '1301578967443337***',
text: 'See bones too so sure your weight perfect!'
}
}
{
data: {
id: '1301578980001230***
text: 'Vcs perderam a Dona Maria, ela percebeu q precisa trabalhar e crescer na vida, percebeu q paga 40% de imposto no consumo enquanto políticos q dizem lutar por ela, estão usufruindo dos direitos q ela nunca vai ter Trabalho escravo é ter q trabalhar pra vcs'
}
}
...... // 20 examples
}"
2020-09-07 编辑
这是响应的有效负载示例:
PassThrough {
_readableState: ReadableState {
objectMode: false,
highWaterMark: 16384,
buffer: BufferList { head: null, tail: null, length: 0 },
length: 0,
pipes: null,
pipesCount: 0,
flowing: true,
ended: false,
endEmitted: false,
reading: false,
sync: false,
....
}
解决方案
应对挑战的三个步骤:
- 数据必须作为流式 HTTP 响应正文获取
- 响应流必须由 JSON 解析器解析,因为数据是从响应中流式传输的
- 在 JSON 解析器解析了 20 个元素后,流将终止
OP 中的示例代码已经说明了如何解决 (1)。
有一些库可以即时解析 JSON 数据流来解决 (2)。我个人的偏好是stream-json
因为它只需要我们管道中的一行代码。
最后,(3) 将要求代码在传入流完成之前终止它。这会导致nodejs抛出ERR_STREAM_PREMATURE_CLOSE
错误,可以通过有针对性的catch语句来处理。
将这些步骤组合起来将成为类似于以下可执行 POC 的内容。我没有 Twitter API 令牌,但我认为这会起作用:
const stream = require('stream');
const util = require('util');
const got = require('got');
const StreamValues = require("stream-json/streamers/StreamValues.js");
(async () => {
const token = "<YOUR API TOKEN>";
const dataStream = got.stream('https://api.twitter.com/2/tweets/sample/stream', {
headers: { "Authorization": `Bearer ${token}` },
});
// This array will by filled by JSON parsed objects from the HTTP response
const dataPoints = [];
await util.promisify(stream.pipeline)(
// This readable stream [dataStream] will emit the incoming HTTP body as string data
dataStream,
// The string data is then JSON parsed on the fly by [stream-json]
StreamValues.withParser(),
// Finally, we iterate over the the JSON objects and push them to the [dataPoints] array.
async function(source){
for await (const parsedObject of source){
dataPoints.push( parsedObject.value );
if( dataPoints.length === 20 ){
// When we reach 20 data points, the stream is forcefully terminated
dataStream.destroy();
return;
}
}
}
)
// Prematurely terminating the stream will cause nodejs to emit a [ERR_STREAM_PREMATURE_CLOSE]
// error. If it is OK to return more than 20 elements, you could try to remove the
// [return] statement on L28;
.catch(error => (error.code !== "ERR_STREAM_PREMATURE_CLOSE" && Promise.reject(error)));
}())
.catch(console.error);
推荐阅读
- angular - 如何处理 Angular 4 打字稿中的无效十进制值?
- python - Unravel Index numpy - 自己的实现
- python - 如何在python中删除具有非数字数据的行
- apollo - 如何在 Apollo 客户端缓存中查询特定类型的所有项目?
- python - 保存 df.show() 的结果
- amazon-web-services - AWS API Gateway Custom Authorizer lambda 未触发
- sql - 多次指定相关名称“CONVERT”
- javascript - php联系表单在单击提交按钮时没有响应
- python - 在 Python 中并行化列表推导
- python-3.x - 我可以将 python 3.7 代码转换为较低版本吗?