首页 > 解决方案 > Spring SseEmitter.complete() 是否应该触发 EventSource 重新连接 - 如何在服务器端关闭连接

问题描述

我正在尝试设置一个 Spring SseEmitter 来发送一系列正在运行的作业状态的更新。它似乎正在工作,但是:

每当我调用emitter.complete()我的 Java 服务器代码时,javascriptEventSource客户端都会调用注册的onerror函数,然后使用新连接再次调用我的 Java 端点。这在 Firefox 和 Chrome 中都会发生。

我可能可以从 Java 发送一个明确的“数据结束”消息,然后检测到它并eventSource.close()在客户端上调用,但是有更好的方法吗?

在这种情况下的目的是什么emitter.complete()

另外,如果我总是必须终止客户端的连接,那么我猜服务器端的每个连接都会因超时或写入错误而终止,在这种情况下,我可能想手动发回一些心跳每隔几秒种一次?

如果我不得不做这一切,感觉就像我错过了一些东西。

标签: javaspringspring-mvceventsource

解决方案


我已将以下内容添加到我的 Spring Boot 应用程序以触发 SSE 连接close()

服务器端:

  1. 创建一个返回 SseEmitter 的简单控制器。
  2. 将后端逻辑包装在单线程执行器服务中。
  3. 将您的事件发送到 SseEmitter。
  4. 在完成时通过 SseEmitter 发送一个完成类型的事件。

    @RestController
    public class SearchController {
    
    @Autowired
    private SearchDelegate searchDelegate;
    
    @GetMapping(value = "/{customerId}/search")
    @ResponseStatus(HttpStatus.OK)
    @ApiOperation(value = "Search Sources", notes = "Search Sources")
    @ApiResponses(value = {
            @ApiResponse(code = 201, message = "OK"),
            @ApiResponse(code = 401, message = "Unauthorized")
    })
    @ResponseBody
    public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception {
        return searchDelegate.route(searchCriteriaDto);
      }
    }
    
    
    
    @Service
    public class SearchDelegate {
    public static final String SEARCH_EVENT_NAME = "SEARCH";
    public static final String COMPLETE_EVENT_NAME = "COMPLETE";
    public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}";
    
    @Autowired
    private SearchService searchService;
    
    private ExecutorService executor = Executors.newCachedThreadPool();
    
    public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception {
        SseEmitter emitter = new SseEmitter();
        executor.execute(() -> {
            try {
                if(!searchCriteriaDto.getCustomerSources().isEmpty()) {
                    searchCriteriaDto.getCustomerSources().forEach(customerSource -> {
                        try {
                            SearchResponse searchResponse = searchService.search(searchCriteriaDto);
                            emitter.send(SseEmitter.event()
                                    .id(customerSource.getSourceId())
                                    .name(SEARCH_EVENT_NAME)
                                    .data(searchResponse));
                        } catch (Exception e) {
                            log.error("Error while executing query for customer {} with source {}, Caused by {}",
                                    customerId, source.getType(), e.getMessage());
                        }
                    });
                }else {
                    log.debug("No available customerSources for the specified customer");
                }
                emitter.send(SseEmitter.event().
                        id(String.valueOf(System.currentTimeMillis()))
                        .name(COMPLETE_EVENT_NAME)
                        .data(COMPLETE_EVENT_DATA));
                emitter.complete();
            } catch (Exception ex) {
                emitter.completeWithError(ex);
            }
        });
        return emitter;
       }
    }
    

客户端:

  1. 由于我们在name上指定了 of 事件SseEmitter,因此将在浏览器上向指定事件名称的侦听器分派一个事件;网站源代码addEventListener()应用于侦听命名事件。(注意:onmessage如果没有为消息指定事件名称,则调用处理程序
  2. 调用EventSourceonCOMPLETE事件释放客户端连接。

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');

sse.addEventListener("SEARCH", function(evt) {
   var data = JSON.parse(evt.data);
   console.log(data);
});

sse.addEventListener("COMPLETE", function(evt) {
   console.log(evt);
   sse.close();
});

推荐阅读