首页 > 解决方案 > rxjs ReplaySubject 句柄

问题描述

我在 Angular 4 中使用的模板有问题。该模板实现了一个通知系统,您可以在其中添加新通知,但文档没有指定如何删除观察者 ReplaySubject 的元素。

模板将其实现为服务,如下所示:

private notificationsList: Notification[] = [];
  // a stream that publishes new notifications only once
  public newNotifications: Subject<Notification> = new Subject<Notification>();

  // `notifications` is a stream that emits an array of the most up to date notifications
  public notifications: ReplaySubject<Notification[]> =
      new ReplaySubject<Notification[]>(1);

  // `updates` receives _operations_ to be applied to our `notifications`
  // it's a way we can perform changes on *all* notifications (that are currently
  // stored in `notifications`)
  public updates: Subject<any> = new Subject<any>();

  // action streams
  public create: Subject<Notification> = new Subject<Notification>();
  // public markThreadAsRead: Subject<any> = new Subject<any>();

  constructor() {
    // recois des operation, et les fais sur la liste interne, puis diffuse le
    // resultat sur notifications
    this.updates.subscribe((ope) => {
      this.notificationsList = ope(this.notificationsList);
      console.log(this.notificationsList);
      this.notifications.next(this.notificationsList);
    });

    this.newNotifications
      .map(function(notification: Notification): INotificationsOperation {
        return (notifications: Notification[]) => {
          return notifications.concat(notification);
        };
      })
      .subscribe(this.updates);

  }

  // an imperative function call to this action stream
  public addNotification(notification: Notification): void {
    this.newNotifications.next(notification);
  }

我尝试向所有者询问如何删除通知列表的实际元素,但他只是告诉我我可以访问“通知”主题以接收它的最新版本。但不要提及我如何实际删除列表中的一个元素。

有人知道吗?

谢谢!

标签: javascriptangularrxjssubject

解决方案


我添加了一个您可以使用的公共功能。我添加了一条注释,让您查看如果您想按名称删除元素,或者不想调整列表大小,您可以修改代码的哪一部分。解释在我的帖子末尾。

  private notificationsList: Notification[] = [];
  // a stream that publishes new notifications only once
  public newNotifications: Subject<Notification> = new Subject<Notification>();
  public removeNotificationByIndex$ : Subject<number> = new Subject<number>();
  // `notifications` is a stream that emits an array of the most up to date notifications
  public notifications: ReplaySubject<Notification[]> =
      new ReplaySubject<Notification[]>(1);

  // `updates` receives _operations_ to be applied to our `notifications`
  // it's a way we can perform changes on *all* notifications (that are currently
  // stored in `notifications`)
  public updates: Subject<any> = new Subject<any>();

  // action streams
  public create: Subject<Notification> = new Subject<Notification>();
  // public markThreadAsRead: Subject<any> = new Subject<any>();

  constructor() {
    // recois des operation, et les fais sur la liste interne, puis diffuse le
    // resultat sur notifications
    this.updates.subscribe((ope) => {
      this.notificationsList = ope(this.notificationsList);
      console.log(this.notificationsList);
      this.notifications.next(this.notificationsList);
    });

    this.newNotifications
      .map(function(notification: Notification): INotificationsOperation {
        return (notifications: Notification[]) => {
          return notifications.concat(notification);
        };
      })
      .subscribe(this.updates);

    this.removeNotificationByIndex$
     .map(function(index: number){
        return (notifications: Notification[]) => {
        // >>>> DELETE METHOD IS TO BE DEFINED DOWN HERE !
        notifications.splice(index,1);
        // >>>> DELETE METHOD IS TO BE DEFINED UP HERE !
      return notifications
     };
     })
     .subscribe(this.updates);

  }

  // an imperative function call to this action stream
  public addNotification(notification: Notification): void {
    this.newNotifications.next(notification);
  }

  // delete the element in the "index" position of the list. 
  // /!\ Resizes the list 
  public removeNotificationsByIndex(index: number): void {
    this.removeNotificationByIndex$.next(index);
  }

有哪些变化?

public removeNotificationByIndex$ : Subject<number> = new Subject<number>();

该主题将(异步)接收索引,并使用该索引触发进程。

 this.removeNotificationByIndex$
 .map(function(index: number){
    return (notifications: Notification[]) => {
    // >>>> DELETE METHOD IS TO BE DEFINED DOWN HERE !
    notifications.splice(index,1);
    // >>>> DELETE METHOD IS TO BE DEFINED UP HERE !
  return notifications
 };
 })
 .subscribe(this.updates);

当索引发出时(即您使用相关的命令式函数),会从中生成一个函数(ES6 箭头函数)。就是这个 :

(notifications: Notification[]) => {
    // >>>> DELETE METHOD IS TO BE DEFINED DOWN HERE !
    notifications.splice(index,1);
    // >>>> DELETE METHOD IS TO BE DEFINED UP HERE !
  return notifications
 };

这个函数被传递给this.update,它将应用它。在这种情况下,ope就是这个功能。收到后,this.notificationList修改如下:

this.notificationsList = ope(this.notificationsList);

最后,这个新列表被发布到 ReplaySubject notifications

this.notifications.next(this.notificationsList);

它将这个新列表传播给它的所有订阅者。

瞧:)。祝你好运 !


推荐阅读