javascript - Javascript - 分叉异步生成器
问题描述
假设我有一个异步生成器,如下所示:
// This could be records from an expensive db call, for example...
// Too big to buffer in memory
const events = (async function* () {
await new Promise(r => setTimeout(r, 0));
yield {type:'bar', ts:'2021-01-01 00:00:00', data:{bar:"bob"}};
yield {type:'foo', ts:'2021-01-02 00:00:00', data:{num:2}};
yield {type:'foo', ts:'2021-01-03 00:00:00', data:{num:3}};
})();
我怎样才能复制它来实现类似的东西:
function process(events) {
async function* filterEventsByName(events, name) {
for await (const event of events) {
if (event.type === name) continue;
yield event;
}
}
async function* processFooEvent(events) {
for await (const event of events) {
yield event.data.num;
}
}
// How to implement this fork function?
const [copy1, copy2] = fork(events);
const foos = processFooEvent(filterEventsByName(copy1, 'foo'));
const bars = filterEventsByName(copy2, 'bar');
return {foos, bars};
}
const {foos, bars} = process(events);
for await (const event of foos) console.log(event);
// 2
// 3
for await (const event of bars) console.log(event);
// {type:'bar', ts:'2021-01-01 00:00:00', data:{bar:"bob"}};
解决方案
我有一个使用Highland作为中介的解决方案。
请注意(来自文档):
分叉给多个消费者的流将一次一个地从其源中提取值,其速度只有最慢的消费者可以处理它们的速度。
import _ from 'lodash'
import H from 'highland'
export function fork<T>(generator: AsyncGenerator<T>): [
AsyncGenerator<T>,
AsyncGenerator<T>
] {
const source = asyncGeneratorToHighlandStream(generator).map(x => _.cloneDeep(x));
return [
highlandStreamToAsyncGenerator<T>(source.fork()),
highlandStreamToAsyncGenerator<T>(source.fork()),
];
}
async function* highlandStreamToAsyncGenerator<T>(
stream: Highland.Stream<T>
): AsyncGenerator<T> {
for await (const row of stream.toNodeStream({ objectMode: true })) {
yield row as unknown as T;
}
}
function asyncGeneratorToHighlandStream<T>(
generator: AsyncGenerator<T>
): Highland.Stream<T> {
return H(async (push, next) => {
try {
const result = await generator.next();
if (result.done) return push(null, H.nil);
push(null, result.value);
next();
} catch (error) {
return push(error);
}
});
}
希望看到没有库或其他库的替代解决方案。
推荐阅读
- reporting-services - SSRS 2016 Oracle ODP.NET 间歇性 ORA-12571
- r - 抓取网页(使用 R),其中所有元素都放置在
标签 - javascript - 使用 javascript 确定最后访问的页面是否是我的网站
- git - git SHA 可以作为电子设备软件版本的一部分显示吗
- prestashop - 在 Prestashop 行政办公室的订单表中为承运人列添加颜色
- node.js - 要求 mongoose 模型导致 Uncaught (in promise) TypeError: Cannot read property 'find' of null
- reactjs - React useReducer 和 context:如何提供状态选择器?
- javascript - 为什么我在 Squarespace 中收到错误 Y Not Defined Error
- c++ - 给快捷方式一个图标
- python - 网页抓取 Youtube 频道以获取视频列表和其他详细信息