首页 > 解决方案 > SpringBoot中多个UnicastProcessor之间的消息传递

问题描述

我尝试通过 UnicastProcessor 发送消息,然后将收到的消息写入 mongoDb。我创建了一个句柄,用于捕获会话。这样做,它的工作原理:

在处理方法中,我现在有:

@Component
public class NotifikaceWebSocketHandler implements WebSocketHandler {

    Logger logger = LoggerFactory.getLogger(NotifikaceWebSocketHandler.class);

    @Autowired
    public UnicastProcessor<String> messagePublisher;

    @Autowired
    public UnicastProcessor<String> messagePublisherDb;

    public Flux<String> outputMessages;
    public Flux<String> outputMessagesDb;

    public NotifikaceWebSocketHandler () {
        logger.info("vytvoreni unicastprocessor a napojeni fluxu se zpravami");
        this.messagePublisher = UnicastProcessor.create();
        this.messagePublisherDb = UnicastProcessor.create();
        this.outputMessages = this.messagePublisher.publish().autoConnect();
        this.outputMessagesDb = this.messagePublisherDb.publish().autoConnect();
    }

    @Bean
    public RouterFunction<ServerResponse> routes(){
        return RouterFunctions.route(
                GET("/"),
                request -> ServerResponse.ok().body(BodyInserters.fromResource(new ClassPathResource("templates/index.html")))
        ).and(RouterFunctions.route(
                GET("/webSocketConnection.js"),
                request -> ServerResponse.ok().body(BodyInserters.fromResource(new ClassPathResource("static/webSocketConnection.js")))
        ));
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {    
        session.receive().subscribe(m -> {
                    String s = m.getPayloadAsText();

                    logger.info("received via subscribe, sent to unicastprocessor using onNext:{} ", s);
                    messagePublisherDb.onNext(s);
                    messagePublisher.onNext(s);
                }
                , err -> {
                    logger.error("subscribe error");
                    logger.error(err.getMessage());
                    err.printStackTrace();
                }
                , () -> {
                    logger.info("end subscribe");
                }
        );      

        return session.send(outputMessages.map(s -> {
            logger.info("sending a large letter from a flux connected to an unicastprocessor: {}", s.toUpperCase());
            return session.textMessage(s.toUpperCase());
        }));

    }

    public Flux<String> getOutputMessagesDb() {
        return outputMessagesDb;
    }
}    

并在服务中:

@Service
public class NotifikaceService {

    Logger logger = LoggerFactory.getLogger(NotifikaceService.class);

    @Autowired
    NotifikaceRepository notifikaceRepository;

    @Autowired
    NotifikaceWebSocketHandler notifikaceWebSocketHandler;

    @PostConstruct
    public void init () {

        notifikaceWebSocketHandler.getOutputMessagesDb().flatMap(s -> {

            logger.info("save {}", s);
            final Notifikace notifikace = new Notifikace();
            notifikace.setGuid(UUID.randomUUID());
            notifikace.setText(s);
            return notifikaceRepository.save(notifikace);

        }).doOnError(error -> {
            // Handle Error here
        }).subscribe();

    }
}

从表单提交后,控制台:

2019-11-22 22:29:10.905  INFO 6248 --- [ctor-http-nio-4] c.o.a.n.NotifikaceWebSocketHandler       : received via subscribe, sent to unicastprocessor using onNext:test 
2019-11-22 22:29:10.906  INFO 6248 --- [ctor-http-nio-4] c.o.a.n.NotifikaceWebSocketHandler       : sending a large letter from a flux connected to an unicastprocessor: TEST

我需要在服务中捕获发送的消息并将其保存在数据库中

我想给mongoDb写一条消息,但是由于手头没有存储库,所以根本行不通。我想创建第二个单播处理器,我将在服务中发送消息,该消息将具有@PostConstruct 方法,我会将其保存到数据库。不幸的是,我没有这样做。有谁知道为什么它不应该这样工作,或者有人做过这样的事情吗?

感谢您的答复。

标签: javaspringspring-bootreactive-programmingspring-webflux

解决方案


推荐阅读