首页 > 解决方案 > 如何停止对 Spring webflux 流的 Angular 调用以远程关闭

问题描述

我正在学习 Spring webflux 和 Reactive Streams,并尝试了一种从 mongoDB 流式传输信息的服务。问题是当没有任何东西可以从 MongoDB 发送时,Spring 会关闭请求。所以我真正想做的是:在我的 mongodb 中有一个 Angular 表显示 SPRING 检索到的数据,并且每次进行更新/插入时,新数据都会自动发送到 Angular。

我发现的唯一方法是每 XXXX 毫秒调用一次我的角度服务。

有没有其他方法可以做到这一点?所以这是我的 Spring Web 服务代码:

@GetMapping(path="/stream/organisation",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Organisation> streamAll() {
    return organisationRepository.findAll();
}

我的角服务:

getOrganisationStream(): Observable<Array<Organisation>> {
    this.Organisations = [];
    return Observable.create((observer) => {
            const eventSource = new EventSource(this.url);
            eventSource.onmessage = (event) => {
                // tslint:disable-next-line:no-console
                console.debug('Received event: ', event);
                const json = JSON.parse(event.data);
                console.log(json);
                const org: Organisation = new Organisation();
                org.codeFase = json.codeFase;
                org.id = json.id;
                org.name = json.name;
                this.Organisations.push(org);
                console.log(this.Organisations.length);
                this.ngZone.run(() => {
                    observer.next(this.Organisations);
                });
            };
            eventSource.onerror = (error) => {
                if (eventSource.readyState === 0) {
                console.log('The stream has been closed by the server.');
                eventSource.close();
                observer.complete();
            } else {
                observer.error('EventSource error: ' + error);
            }
        };
    });
}

我的组件:

organisations: Observable<Organisation[]>;
constructor(private testService: TestService) {
}
ngOnInit(): void {
    this.organisations = this.testService.getOrganisationStream();
}

我的 HTML:

   <div *ngFor="let org of organisations | async"> 
{{org.name}} {{org.codeFase}}
</div>

标签: angularspring-webflux

解决方案


您将需要使用可尾游标,它是一个无限流,它保持打开状态,直到它在外部关闭。

在您的存储库中执行以下操作:

@Tailable
Flux<Organisation> findAll();

当订阅被丢弃时,光标将关闭,在您的情况下,当客户端关闭连接时。


推荐阅读