首页 > 解决方案 > Javascript - 分叉异步生成器

问题描述

假设我有一个异步生成器,如下所示:

// This could be records from an expensive db call, for example...
// Too big to buffer in memory
const events = (async function* () {
    await new Promise(r => setTimeout(r, 0));
    yield {type:'bar', ts:'2021-01-01 00:00:00', data:{bar:"bob"}};
    yield {type:'foo', ts:'2021-01-02 00:00:00', data:{num:2}};
    yield {type:'foo', ts:'2021-01-03 00:00:00', data:{num:3}};
})();

我怎样才能复制它来实现类似的东西:

function process(events) {

    async function* filterEventsByName(events, name) {
        for await (const event of events) {
            if (event.type === name) continue;
            yield event;
        }
    }

    async function* processFooEvent(events) {
        for await (const event of events) {
            yield event.data.num;
        }
    }

    // How to implement this fork function?
    const [copy1, copy2] = fork(events);

    const foos = processFooEvent(filterEventsByName(copy1, 'foo'));
    const bars = filterEventsByName(copy2, 'bar');

    return {foos, bars};
}

const {foos, bars} = process(events);

for await (const event of foos) console.log(event);
// 2
// 3

for await (const event of bars) console.log(event);
// {type:'bar', ts:'2021-01-01 00:00:00', data:{bar:"bob"}};

标签: javascriptgenerator

解决方案


我有一个使用Highland作为中介的解决方案。

请注意(来自文档):

分叉给多个消费者的流将一次一个地从其源中提取值,其速度只有最慢的消费者可以处理它们的速度。

import _ from 'lodash'
import H from 'highland'

export function fork<T>(generator: AsyncGenerator<T>): [
    AsyncGenerator<T>,
    AsyncGenerator<T>
] {
    const source = asyncGeneratorToHighlandStream(generator).map(x => _.cloneDeep(x));
    return [
        highlandStreamToAsyncGenerator<T>(source.fork()),
        highlandStreamToAsyncGenerator<T>(source.fork()),
    ];
}

async function* highlandStreamToAsyncGenerator<T>(
    stream: Highland.Stream<T>
): AsyncGenerator<T> {
    for await (const row of stream.toNodeStream({ objectMode: true })) {
        yield row as unknown as T;
    }
}

function asyncGeneratorToHighlandStream<T>(
    generator: AsyncGenerator<T>
): Highland.Stream<T> {
    return H(async (push, next) => {
        try {
            const result = await generator.next();
            if (result.done) return push(null, H.nil);
            push(null, result.value);
            next();
        } catch (error) {
            return push(error);
        }
    });
}

希望看到没有库或其他库的替代解决方案。


推荐阅读