首页 > 解决方案 > 如何包装 Node `EventEmitter` 以便能够将其作为终止的`AsyncIterator` 使用?

问题描述

我正在尝试使用 XML 流解析 API ( xml-flow),它公开了一个节点,该节点为我感兴趣的标签EventEmitter发出一堆事件,并在它完成读取文档时发出一个事件。tagend

我希望能够使用Interactive Extensions来解决这个问题,但我不知道如何将其转换为结束的异步迭代;ix只有fromEvent/fromEventPattern似乎没有办法处理“结束”事件。

尝试:

import * as aix from 'ix/asynciterable';
import flow from 'xml-flow';

const iterTags = aix.fromEvent(flow(...), 'tag:foo');
console.log('max', aix.max(iterTags));

不产生任何输出,而添加 a.pipe(tap(console.debug))以打印正在迭代的值向我显示流实际上正在正确处理。

有没有办法可以连接end事件以使迭代器return正常工作?

标签: javascriptnode.jsasync-awaitgeneratorixjs

解决方案


我设法通过使用 中的某些部分rxjs而不是相应的ixjs版本来创建一个异步迭代。

无论你用 rxjs 做什么,你都可以通过管道ix.from将 observable 转换为异步迭代。

输入:

  <root>
      <foo>
        <name>Bill</name>
        <id>1</id>
        <age>27</age>
      </foo>
      <foo>
        <name>Sally</name>
        <id>2</id>
        <age>40</age>
      </foo>
      <foo>
        <name>Kelly</name>
        <id>3</id>
        <age>37</age>
      </foo>
    </root>

代码:

const {fromEvent} = require('rxjs');
const {takeUntil, tap, map} = require('rxjs/operators');
const ai = require('ix/asynciterable');

const [aiFrom, aiMax] = [ai.from, ai.max];


const flow = require('xml-flow');
const fs = require('fs');
const path = require('path');

const inFile = fs.createReadStream(path.join(__dirname, 'test.xml'));


const eventEmitter = flow(inFile);
const iterTags = aiFrom(
        fromEvent(eventEmitter, 'tag:foo')
            .pipe(takeUntil(fromEvent(eventEmitter, 'end')))
            .pipe(tap(console.log), 
                  map(o=>o.age))
    );


aiMax(iterTags).then((v)=>console.log("max age:", v));

// This works also
// (async()=>{
//     for await(el of iterTags){
//         console.log(el)
//     }
// })();

输出:

{ '$name': 'foo', name: 'Bill', id: '1', age: '27' }
{ '$name': 'foo', name: 'Sally', id: '2', age: '40' }
{ '$name': 'foo', name: 'Kelly', id: '3', age: '37' }
max age: 40

推荐阅读