java - SpringBoot中多个UnicastProcessor之间的消息传递
问题描述
我尝试通过 UnicastProcessor 发送消息,然后将收到的消息写入 mongoDb。我创建了一个句柄,用于捕获会话。这样做,它的工作原理:
在处理方法中,我现在有:
@Component
public class NotifikaceWebSocketHandler implements WebSocketHandler {
Logger logger = LoggerFactory.getLogger(NotifikaceWebSocketHandler.class);
@Autowired
public UnicastProcessor<String> messagePublisher;
@Autowired
public UnicastProcessor<String> messagePublisherDb;
public Flux<String> outputMessages;
public Flux<String> outputMessagesDb;
public NotifikaceWebSocketHandler () {
logger.info("vytvoreni unicastprocessor a napojeni fluxu se zpravami");
this.messagePublisher = UnicastProcessor.create();
this.messagePublisherDb = UnicastProcessor.create();
this.outputMessages = this.messagePublisher.publish().autoConnect();
this.outputMessagesDb = this.messagePublisherDb.publish().autoConnect();
}
@Bean
public RouterFunction<ServerResponse> routes(){
return RouterFunctions.route(
GET("/"),
request -> ServerResponse.ok().body(BodyInserters.fromResource(new ClassPathResource("templates/index.html")))
).and(RouterFunctions.route(
GET("/webSocketConnection.js"),
request -> ServerResponse.ok().body(BodyInserters.fromResource(new ClassPathResource("static/webSocketConnection.js")))
));
}
@Override
public Mono<Void> handle(WebSocketSession session) {
session.receive().subscribe(m -> {
String s = m.getPayloadAsText();
logger.info("received via subscribe, sent to unicastprocessor using onNext:{} ", s);
messagePublisherDb.onNext(s);
messagePublisher.onNext(s);
}
, err -> {
logger.error("subscribe error");
logger.error(err.getMessage());
err.printStackTrace();
}
, () -> {
logger.info("end subscribe");
}
);
return session.send(outputMessages.map(s -> {
logger.info("sending a large letter from a flux connected to an unicastprocessor: {}", s.toUpperCase());
return session.textMessage(s.toUpperCase());
}));
}
public Flux<String> getOutputMessagesDb() {
return outputMessagesDb;
}
}
并在服务中:
@Service
public class NotifikaceService {
Logger logger = LoggerFactory.getLogger(NotifikaceService.class);
@Autowired
NotifikaceRepository notifikaceRepository;
@Autowired
NotifikaceWebSocketHandler notifikaceWebSocketHandler;
@PostConstruct
public void init () {
notifikaceWebSocketHandler.getOutputMessagesDb().flatMap(s -> {
logger.info("save {}", s);
final Notifikace notifikace = new Notifikace();
notifikace.setGuid(UUID.randomUUID());
notifikace.setText(s);
return notifikaceRepository.save(notifikace);
}).doOnError(error -> {
// Handle Error here
}).subscribe();
}
}
从表单提交后,控制台:
2019-11-22 22:29:10.905 INFO 6248 --- [ctor-http-nio-4] c.o.a.n.NotifikaceWebSocketHandler : received via subscribe, sent to unicastprocessor using onNext:test
2019-11-22 22:29:10.906 INFO 6248 --- [ctor-http-nio-4] c.o.a.n.NotifikaceWebSocketHandler : sending a large letter from a flux connected to an unicastprocessor: TEST
我需要在服务中捕获发送的消息并将其保存在数据库中
我想给mongoDb写一条消息,但是由于手头没有存储库,所以根本行不通。我想创建第二个单播处理器,我将在服务中发送消息,该消息将具有@PostConstruct 方法,我会将其保存到数据库。不幸的是,我没有这样做。有谁知道为什么它不应该这样工作,或者有人做过这样的事情吗?
感谢您的答复。
解决方案
推荐阅读
- ruby-on-rails - 允许我的网络应用程序的用户发布到他们的个人 Facebook 页面
- list - 从列表中获取元素并递归地成对添加它们 F#
- ios - 如何使用 DateFormatter 创建日期类型的当前日期?
- javascript - 只运行一次函数。&& 在 IE 中不起作用
- java - 使用 createButton 方法在 JFrame 中创建按钮
- regular-language - 给定两个 DFA,如何检查第一个 DFA 生成的语言是否包含在第二个 DFA 生成的语言中?
- azure - 在 Bitbucket 和 Azure 中构建失败
- php - 缓存商店与其他变量 Laravel
- ruby-on-rails - Ruby on Rails RSpec 请求规范:无法传递参数
- sql - 我有一个场景,当我需要在更改某些列后从同一个表中插入表时。问题是关键列