javascript - Angular 5 Rxjs Subject.subscribe() 未在多个组件中触发
问题描述
我正在使用 rxjs 和主题来更新我的两个组件。
我正在订阅服务中的主题,但是当在主题上调用 .next 方法时,它只会更新我的一个组件。
该应用程序包含一个用于初始化 websocketconnection 的 WebsocketService,一个使用 WebsocketService 连接到后端并发送/接收通知的 NotificationService。
我有一个 NotificationComponent 可以在其中创建新通知。在这个组件中,我订阅了 NotificationService 中的主题,并在通知更新时显示通知。这工作得很好,消息到达后端,并在当前有连接的所有浏览器中更新。
我的下一步是在 HeaderComponent 中显示此通知。我在这里注入了 NotificationService 并订阅了相同的主题,但是当我发送通知时,HeaderComponents 订阅不会触发。console.log 消息永远不会出现在控制台中。
WebSocket服务
import { Injectable } from '@angular/core';
import { ReplaySubject, Subject, Observable, Observer } from 'rxjs/Rx';
@Injectable()
export class WebsocketService {
constructor() { }
private subject: ReplaySubject<MessageEvent>;
public connect(url): ReplaySubject<MessageEvent> {
if (!this.subject) {
this.subject = this.create(url);
console.log("Successfully connected: " + url);
}
return this.subject;
}
private create(url): ReplaySubject<MessageEvent> {
//create connection
let ws = new WebSocket(url);
//define observable
let observable = Observable.create(
(obs: Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
});
//define observer
let observer = {
next: (data: Object) => {
if (ws.readyState === WebSocket.OPEN) {
console.log("---sending ws message---");
ws.send(JSON.stringify(data));
}
}
};
return ReplaySubject.create(observer, observable);
}
}
通知服务
import { Injectable } from '@angular/core';
import { Observable, Subject, ReplaySubject, BehaviorSubject } from 'rxjs/Rx';
import { WebsocketService } from './websocket.service';
import { Notification } from './../model/notification'
const NOTIFICATION_URL = 'ws://localhost:8080/Kwetter/socket'
@Injectable()
export class NotificationService {
public _notification: ReplaySubject<Notification>;
constructor(websocketService: WebsocketService) {
this._notification = <ReplaySubject<Notification>>websocketService
.connect(NOTIFICATION_URL)
.map((response: MessageEvent): Notification => {
let data = JSON.parse(response.data);
return {
sender: data.author,
message: data.message
}
});
}
sendMessage(notification) {
console.log("---calling .next()---");
this._notification.next(notification);
}
}
通知组件
import { Component, OnInit } from '@angular/core';
import { NotificationService } from '../services/notification.service';
import { UserService } from '../services/user.service';
import { Notification } from './../model/notification';
@Component({
selector: 'app-notifications',
templateUrl: './notifications.component.html',
styleUrls: ['./notifications.component.css']
})
export class NotificationsComponent implements OnInit {
notification: Notification;
text: string;
constructor(private notificationService: NotificationService, private userService: UserService) {
if (this.notification == null) {
this.notification = new Notification("", "");
}
notificationService._notification.subscribe(notification => {
console.log("---notification has been updated---")
this.notification = notification;
});
}
sendMsg() {
let newNot = new Notification(this.userService.getUser(), this.text);
this.notificationService.sendMessage(newNot);
}
ngOnInit() {
}
}
标头组件
import { Component, OnInit, OnDestroy } from '@angular/core';
import { UserService } from '../../services/user.service';
import { NotificationService } from '../../services/notification.service';
import { Router } from '@angular/router';
import { Subscription } from 'rxjs/Subscription';
import { Profile } from '../../model/profile';
import { User } from '../../model/user';
import { Notification } from '../../model/notification';
@Component({
selector: 'app-header',
templateUrl: './header.component.html',
styleUrls: ['./header.component.css']
})
export class HeaderComponent implements OnInit, OnDestroy {
private notification: Notification;
private loggedIn = false;
private user: User;
private subscription: Subscription;
constructor(private userService: UserService, private router: Router, private notificationService: NotificationService) {
console.log("---constructor headercomponent---");
console.log(this.notification);
this.notificationService._notification.subscribe(notification => {
console.log("---header notification has been updated---");
this.notification = notification;
});
if (this.notification == null) {
this.notification = new Notification("", "");
}
this.subscription = this.userService.profile$.subscribe(user => {
this.user = user;
if (user !== null) {
this.loggedIn = true;
}
else this.loggedIn = false;
});
this.loggedIn = userService.isLoggedIn();
this.user = userService.getUser();
}
logout() {
this.userService.logout();
this.router.navigate(['home']);
}
home() {
this.router.navigate(['home']);
}
myProfile() {
console.log("click");
this.router.navigate(['profile', this.userService.getUser().id]);
}
getLoggedIn(): void {
this.loggedIn = !!this.userService.isLoggedIn();
}
ngOnInit() {
this.getLoggedIn();
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
NotificationComponent 使用路由器插座显示,标题组件始终使用选择器标签显示,但我认为这无关紧要。
<div>
<app-header></app-header>
<div class="content">
<router-outlet></router-outlet>
</div>
</div>
我找到了下面的线程,建议使用 ReplaySubject 以防我在事件触发后订阅(我不认为是这种情况,但我还是尝试了)。这没有用。
此外,我只有一个 app.module 在其中声明提供程序。由于我对两个组件使用相同的代码,为什么 .subscribe 只能在 NotificationComponent 中工作?
解决方案
您看到的行为与 RxJS 的工作方式以及流的创建方式有关。让我们来看看WebsocketService
:
let observable = Observable.create(
(obs: Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
obs
对于每个订阅都是新的,但ws
始终相同。因此,当您在NotificationComponent
回调中第二次订阅时,只会为该订阅onmessage
调用。next
因此,只有该组件接收消息。
您可以通过notificationService._notification.subscribe
在NotificationComponent
. 然后HeaderComponent
将收到消息。
一种简单的解决方案是将share
运算符添加到NotificationService
:
this._notification = <ReplaySubject<Notification>>websocketService
.connect(NOTIFICATION_URL)
.map((response: MessageEvent): Notification => {
let data = JSON.parse(response.data);
return {
sender: data.author,
message: data.message
}
})
.share();
这意味着订阅上游.share()
将被共享,即(obs: Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
只会被调用一次,两个组件都会收到消息。
顺便说一句:RxJs 提供对 websockets 的支持。您可以创建一个流Observable.webSocket(url);
并删除一些代码。
推荐阅读
- mysql - 可以导入数据库而不是在 laravel 中进行迁移吗?
- java - 当我在 Optaplanner 中使用 @PlanningPin 时,如何解决“退出 neverEnding 选择器以避免无限循环”错误?
- jestjs - Jest 的 beforeAll 可以等到它的描述范围执行吗?
- windows - 如何修复 Windows Server 2016 上 mongodb 中的超时错误?
- bash - `alpine:edge` docker 图像上的 Codecov bash 上传器`eval error`
- flutter - 在颤振列表上实现 Dismissible 时出错
- excel - 在 Excel 中计算跨越闰年的日期的 YEARFRAC
- tensorflow - 标签未显示正确名称
- reactjs - 如何从打字稿中的url获取查询字符串?
- eclipse - Eclipse 突然显示错误,无法启动