首页 > 解决方案 > RxJS Subject 完成后自动关闭订阅

问题描述

我在服务中公开了两个主题,因此我可以呈现主题中的值并将它们呈现在我的页面上。

其中一个主题正确接收信息,但第二个主题自动关闭订阅,我无法再检索数据。

这是我的代码:

// service.ts
  clientSubject: Subject<any>;
  jobSubject: Subject<any>;

  constructor(private clientService: ClientService, private jobService: JobService) {
    this.clientSubject = new Subject();
    this.jobSubject = new Subject();
  }

  fetchData(): void {
    this.clientService.getClients()
    .pipe(
      map(clients => clients.map(
        client => this.jobService.getJobs(client.clientId)
          .pipe(
            map(jobs => jobs.map(job => ({client, job})))
          )
          .subscribe(this.jobSubject)
        )
    ))
    .subscribe(this.clientSubject);
  }

这是我组件中的代码:

  clients: any;
  jobs: any;

  constructor(private invoiceService: InvoiceService) {}

  ngOnInit(): void {
    this.invoiceService.clientSubject.subscribe(d => {
      this.clients = d;
      console.log(this.clients);
    });

    this.invoiceService.jobSubject.subscribe(d => {
      this.jobs = d;
      console.log(this.jobs);
    });

    this.invoiceService.fetchData();
  }

这是控制台记录的内容:

### this.jobs data ###
invoices.component.ts:25 
[{…}]
0: {client: {…}, job: {…}}
length: 1
__proto__: Array(0)


### this.clients data ###
invoices.component.ts:19 
(2) [SubjectSubscriber, SubjectSubscriber]
0: SubjectSubscriber {closed: true, _parentOrParents: null, _subscriptions: null, syncErrorValue: null, syncErrorThrown: false, …}
1: SubjectSubscriber {closed: true, _parentOrParents: null, _subscriptions: null, syncErrorValue: null, syncErrorThrown: false, …}
length: 2
__proto__: Array(0)

为什么this.clientsSubjectSubscribers 会根据 中的实现覆盖数据fetchData

jobService.getJobs(client.clientId)返回具有以下格式的可观​​察对象:

[{jobId: 1, name: 'blabla', items: []}]

jobService.getClients()返回具有以下格式的可观​​察对象:

[{clientId: 1, name: 'blabla', isParent: false}]

标签: angularrxjs

解决方案


马上。Map 正在将一组客户端转换为一组订阅。就是这样clients.map([...]。是在做。也许让你感到困惑的是.subscribe返回一个值?像这样:

const valueReturnedBySubscribe = anyObservable.subscribe(LAMBDA);
console.log(valueReturnedBySubscribe);

输出:

SubjectSubscriber {closed: true, [...]

试试这个:

fetchData(): void {
  this.clientService.getClients().pipe(
    tap(clients => clients.forEach(
      client => this.jobService.getJobs(client.clientId)
        .pipe(
          map(jobs => jobs.map(job => ({client, job})))
        )
        .subscribe(this.jobSubject)
      )
    )
  )
  .subscribe(this.clientSubject);
}

tap根本不会改变你的流,而且你似乎也不想要你的订阅对象。不知道你为什么使用地图,但看起来你不需要它。


更新

让 jobSubject 订阅 clientSubject 可能是最有意义的,因为新客户总是意味着对新作业的查询。这可能看起来像:

clientSubject: Subject<any>;
jobSubject: Subject<any>;

constructor(private clientService: ClientService, private jobService: JobService) {

  this.clientSubject = new Subject();
  this.jobSubject = new Subject();

  this.clientSubject.pipe(
    map(clients => clients.map(
      client => jobService.getJobs(client.clientId).pipe(
        map(jobs => jobs.map(job => ({client, job})))
      )
    )),
    mergeMap(jobs => merge(jobs))
  ).subscribe(this.jobSubject);
}

fetchData(): void {
  this.clientService.getClients().subscribe(this.clientSubject);
}

在旁边:

现在,您的主题(clientSubject 和 jobSubject)是短暂的。当他们订阅的流之一完成(或错误)时,他们完成。如果你想避免这种情况,你可以只订阅值而不是源流complete()error()消息

您需要做的就是改变:

this.clientSubject/*...code...*/subscribe(this.jobSubject);
to
this.clientSubject/*...code...*/subscribe(this.jobSubject.next);

this.clientService.getClients().subscribe(this.clientSubject);
to
this.clientService.getClients().subscribe(this.clientSubject.next);

推荐阅读