java - Redisson 异步处理消息
问题描述
我正在尝试为我的项目应用 Redisson 功能作为消息代理,我有一个问题。是否可以将 Redisson 推送到异步接收到的消息?我创建了一个小例子,从不同的 URL 发送了 4 条消息。我预计,Redisson 会异步处理它们,但它会一一进行。这里的实现:
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}
}
和配置:
@Configuration
public class RedisListenerConfig {
@Autowired
public RedisListenerConfig(RedissonClient redisClient,
MessageListener redisListenerService,
@Value("${redis.sub.key}") String redisSubKey) {
RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
subscribeTopic.addListenerAsync(String.class, redisListenerService);
}
}
解决方案
这是预期的行为。如果您希望在触发侦听器onMessage()方法时同时处理您的消息,只需使用线程池来处理它。
由于 Redisson 不知道您要使用多少个线程来使用触发的事件,因此它将实现细节留给您。
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
executorService.submit(()->{
System.out.println("do something with message: "+msg);
});
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}
推荐阅读
- jobs - Batch job submission failed: I/O error writing script/environment to file
- java - Cognito xray - segment not found issue
- c# - Unity - My script stops working on Editor but works on my builds(android, ios)
- java - 如果文本字段为空,则禁用 AjaxSubmitButton
- http-headers - sec-fetch-site 标头是什么意思?为什么 Origin 标头未定义?
- microsoft-edge - Edge 浏览器不支持 X-UA-Compatible 元标记中定义的文档模式
- scala - Scala: Alternative for deprecated set difference
- redis - Redis:Redisinsight:连接时出错:“添加数据库时出错。请重试”
- c# - 嵌入式字体在 Windows 7 中不支持“常规”样式,但在 Windows 10 中可以
- angularjs - 在 Angular 应用程序中使用服务器呈现的 html 的解决方法