首页 > 解决方案 > 服务器发送事件是否会强制 Netty 持有相同的线程,直到流关闭?

问题描述

上下文:HTML5 前端将调用响应 Flux 的服务。将 Spring WebFlux 与 Netty 一起使用的目的是利用较少的线程需求并将事件从服务器单向推送到前端。我所说的事件是指许多状态变化,直到最后。堆栈是完全反应式的:Angular9/RxJS -> Spring WebFlux/Netty -> springframework.data.mongodb.repository.ReactiveMongoRepository -> MongoDb。据我所知,这确实是一个非阻塞堆栈(请参见下面的代码片段,我确信我不会在任何地方阻塞)。另外,您可以看到 SSE 确实启用:在 Rest Service 上产生 = MediaType.TEXT_EVENT_STREAM_VALUE,在 Front 上产生 EventSource。

主要问题:既然从第一个状态到最后一个状态可能需要 10 秒到 30 秒,那么线程会在这段时间内保持吗?考虑到我们有 Sensedia Api 网关,我考虑了很长时间。如果是这样,我会开始怀疑使用无阻塞服务器(例如 Netty)是否比使用阻塞服务器(例如 Tomcat)有一些好处。免责声明:我只是在谈论在我的特定场景中避免创建线程和锁定。我一般不比较服务器。

微服务启动:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.config.EnableWebFlux;

@EnableWebFlux
@SpringBootApplication
public class FluxdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(FluxdemoApplication.class, args);
    }

}

SSE 控制器端点:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import com.reactive.fluxdemo.domain.Transfer;
import com.reactive.fluxdemo.repository.TransferRepository;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;

@RestController
public class TransferController {

    // Server Sent Events
    @GetMapping(value = "/stream/transfers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Transfer> streamAllTransfers() {
        return transferRepository.findAll();
    }

领域:

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class Transfer {
    @Id
    private String id;
...
    private Integer status;

存储库:

import org.springframework.stereotype.Repository;
import com.reactive.fluxdemo.domain.*;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

@Repository
public interface TransferRepository extends ReactiveMongoRepository<Transfer, String> {

}

FrontEnd 以下任一项都是 HTML5 SSE。对于这个问题,纯 HTML5 或更复杂的 Observer 无关紧要。顺便说一句,我在下面粘贴了两者以举例说明 Front 打开了一个服务器发送事件通道。

纯 HTML5 的简化版

<div id="content"></div>
<script>
    var source = new EventSource();
    source.addEventListener('message', function (e) {
        console.log('New message is received');
        const index = JSON.parse(e.data);
        const content = `New event added: ${index.status}<br>`;
        document.getElementById("content").innerHTML += content;
    }, false);
</script>

带有 Angular/RxJs 观察者的完整版本

import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';


@Injectable({
  providedIn: "root"
})
export class SseService {
  extratos: Extrato[] = [];
  constructor(private _zone: NgZone) { }

  getServerSentEvent(url: string): Observable<any> {
    this.extratos = [];
    return Observable.create(observer => {
      const eventSource = this.getEventSource(url);
      eventSource.onmessage = event => {
        this._zone.run(() => {
          let json = JSON.parse(event.data);
          this.extratos.push(new Extrato(json['id'], json['description'], json['value'], json['status']));
          observer.next(this.extratos);
        });
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          observer.complete();
        } else {
          observer.error('EventSource error: ' + error);
        }
      }

    });
  }
  private getEventSource(url: string): EventSource {
    return new EventSource(url);
  }
}

标签: multithreadingnettyspring-webfluxserver-sent-eventsreactor-netty

解决方案


推荐阅读