首页 > 解决方案 > Angular 5 从无尽的 BehaviorSubject 完成一个 obvservable

问题描述

所以我有一个 Observable 数据存储,每次存储更改时,它都有一个私有 BehaviorSubject 发射项目。

我还有一个公共方法,它返回从 BehaviorSubject 创建的 observable,它根据状态过滤项目并跳过项目。

我想知道如何返回一个在满足特定条件后完成的可观察对象而不给消费者负责?

编辑:这种情况下的解决方案是让消费者使用 .take(1) 运算符,以便它在第一次发出后完成。

这是与我的问题有关的一些代码:

class Item {

  public id: string;
  public state: number;
};


@Injectable()
export class SomeDataService(){

  private let items   = [];                          //The store of items
  private let stream$ = new BehaviorSubject<Item[]>; //The endless stream of items.

  constructor( someService: SomeOtherService ){

    this.someService.items$.subscribe( items => {

      this.items = items;
      this.stream$.next( items );
    });
  };


  //
  public getObservable( filterID: string ): Observable<Item> {

    return this.$stream.asObservable().map( items => {

      //Find the item in the list and return it
      return items.find( item => {
        return item.id === filterID;
      });
    }).flatMap( item => {

      if( item && item.state === 3 ) { //arbitrary number
        throw Observable.throw( item );
      }

      //Transform the item and such...

      return Observable.of( item );
    }).skipWhile( item => {

      return item && item.state !== 1;
    });
  };
};

//一些其他文件来使用服务

this.someDataService.getObservable( 'uniqueID' ).finally( () => {
  console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();

/*
  //Previous attempt
  this.someDataService.getObservable( 'uniqueID' ).finally( () => {

    console.log("I will never print this as the observable never completes.");
  }).subscribe();
*/

标签: angularobservablebehaviorsubject

解决方案


不知道为什么在存储项目时使用 BehaviorSubjectprivate const items = [];而且我个人不喜欢服务“执行逻辑”。无论如何,问题出在skipWhile.

例如,如果您将此项目分配给您的项目属性:

const itemsO: Item[] = [
    {
        id: '0',
        state: 1
    },
    {
        id: '1',
        state: 1
    },
    {
        id: '2',
        state: 1
    },
    {
        id: '0',
        state: 1
    },
];

并使用 as 参数调用getObservable函数'0',代码将正常工作,因为skipWhile会发现谓词为真,所以它会完成工作。

我已经对您的代码进行了此操作并进行了尝试,并且效果很好:

服务 :

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/skipWhile';
class Item {

    public id: string;
    public state: number;
}

const itemsO: Item[] = [
    {
        id: '0',
        state: 1
    },
    {
        id: '1',
        state: 1
    },
    {
        id: '2',
        state: 1
    },
    {
        id: '0',
        state: 1
    },
];

@Injectable()
export class SomeDataService {

    private items = [];
    private stream$: BehaviorSubject<Item[]> = new BehaviorSubject<Item[]>(null);

    constructor() {
        this.items = itemsO;
        this.stream$.next(itemsO);
    }

    public getObservable(filterID: string): Observable<Item> {
        return this.stream$.asObservable().map(items => {

            // Find the item in the list and return it
            return items.find(item => {
                return item.id === filterID;
            });
        }).flatMap(item => {

            if (item && item.state === 3) { // arbitrary number
                throw Observable.throw(item);
            }

            // Transform the item and such...

            return Observable.of(item);
        }).skipWhile(item => {
            return item.state !== 1;
        });
    }
}

在主要组件上:

constructor(a: SomeDataService) {
        a.getObservable('0')
            .subscribe(evt => {
                console.log(evt);
                // You will see something in console
            });
    }

我不知道您要达到什么目标,但问题仅在于skipWhile. 如果您想丢弃数据或发出一个空数组,如果status !== 1只使用另一个运算符


推荐阅读