首页 > 解决方案 > 节点串行端口:应用超时时无法使用 RxJS Observable 处理传入数据

问题描述

我目前正在使用该serialport模块RxJS Observables一起开发 NodeJS 项目。预期的“流程”/用例如下:

  1. 串口名称portName通过串口发送
  2. 由于 RxD 和 TxD 相互链接,因此数据在硬件端“回显”
  3. 数据通过串口读取
  4. 传入的数据由 处理readline-parser并传递给RxJS Observable
  5. 如果 readdata等于先前发送的portName,则不再需要 observable 并将由observer.complete()

我能够实现上述流程,但需要做一些进一步的实现,比如

  1. 如果在给定的时间段内没有收到数据,则超时
  2. 在超时或其他错误的情况下重试发送命令

我正在研究超时实现并尝试了NodeJSsetTimeout()RxJS 自己的timeout功能。应用任何类型的超时功能时,串行端口似乎不会读取/检索数据,这反过来会引发超时错误。

假设没有数据,乍一看这似乎很好,因为超时确实做了它应该做的事情。但是,我不仅可以使用软件中的模拟串行端口,还可以使用两个 CP2102 USB 到串行转换器(请参阅代码中的注释了解更多详细信息)来仔细检查所需数据是否已发送到端口):

'use strict';
const Rx = require('rxjs');
const { interval } = require('rxjs');
const { timeout } = require('rxjs/operators');
const SerialPort = require('serialport');
const Readline = require('@serialport/parser-readline');


// `tries` is needed for later implementation of communcation retries
const myObservable = (portName, tries) => {
  const port = new SerialPort(portName);
  const parser = port.pipe( new Readline() );
  port.write(`${portName}\n`);

  return Rx.Observable
    .create( (observer) => {
      parser.on('data', (data) => {
        observer.next(data);
        console.log(`Detection will be: ${data == portName} (${data} vs. ${portName})`);
        if (data == portName)
        {
          port.close( (err) => {
            if (err)
            {
              console.log(`Error on closing serial port: ${err}`);
              observer.error(err);
            }
          });
          observer.complete();
        }
      })
    })
    // `timeout` is needed for later implementation of communication timeout, see comment at end of code
    // .pipe(timeout(10000))
}


const myObserver = {
  next:     x => console.log(`got data from observable: ${x}`),
  error:    err => console.error(`something wrong occurred: ${err}`),
  complete: () => console.log('done'),
};


console.log('before subscribe');
const sub = myObservable('/dev/tty.usbserial-FTG7L3FX', null).subscribe(myObserver);
// double-checked that data is sent by using software (created an emulated pair of virtual serial ports with `socat -d -d pty,raw,echo=0 pty,raw,echo=0`)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/ttys003', null).subscribe(myObserver);
// double-checked that data is sent by using hardware interfaces (used two CP2102 modules with pairwise-crossed RxD and TxD)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/tty.SLAB_USBtoUART', null).subscribe(myObserver);
console.log('after subscribe');


// when commenting the following `setTimeout()` data is retrieved, but does not work with `setTimeout()`
// so tried to use RxJS' `timeout()` operator --> not working either
// setTimeout(() => {
//   sub.unsubscribe();
//   console.log('unsubscribed');
// }, 10000);

我在这里想念什么?为什么在不使用任何类型的超时时发送数据但在应用超时功能时不发送数据?


因进一步调查而更新:

  1. 应用时timeout(),数据在超时持续时间后发送,这意味着超时会触发发送数据并退出可观察对象,因为它已超时。所以.timeout()似乎没有应用于返回Rx.Observable,而是应用于整个函数myObservable

标签: node.jsrxjsserial-portnode-serialportnode-streams

解决方案


推荐阅读