首页 > 解决方案 > Redisson 异步处理消息

问题描述

我正在尝试为我的项目应用 Redisson 功能作为消息代理,我有一个问题。是否可以将 Redisson 推送到异步接收到的消息?我创建了一个小例子,从不同的 URL 发送了 4 条消息。我预计,Redisson 会异步处理它们,但它会一一进行。这里的实现:

public class RedisListenerServiceImpl implements MessageListener<String> {

    private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void onMessage(CharSequence channel, String stringMsg) {

        log.info("Message received: {}", stringMsg);
        MessageDto msg;
        try {
            msg = objectMapper.readValue(stringMsg, MessageDto.class);
        } catch (final IOException e) {
            log.error("Unable to deserialize message: {}", e.getMessage(), e);
            return;
        }

        try {
            //Do my stuff
        } catch (Exception e) {
            log.error("Unable to get service from factory: {}", e.getMessage(), e);
        }

    }
}

和配置:

@Configuration
public class RedisListenerConfig {

    @Autowired
    public RedisListenerConfig(RedissonClient redisClient,
                               MessageListener redisListenerService,
                               @Value("${redis.sub.key}") String redisSubKey) {

        RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
        subscribeTopic.addListenerAsync(String.class, redisListenerService);
    }
}

标签: javamultithreadingasynchronousredisredisson

解决方案


这是预期的行为。如果您希望在触发侦听器onMessage()方法时同时处理您的消息,只需使用线程池来处理它。

由于 Redisson 不知道您要使用多少个线程来使用触发的事件,因此它将实现细节留给您。

public class RedisListenerServiceImpl implements MessageListener<String> {

private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);

@Override
public void onMessage(CharSequence channel, String stringMsg) {

    log.info("Message received: {}", stringMsg);
    MessageDto msg;
    try {
        msg = objectMapper.readValue(stringMsg, MessageDto.class);
        executorService.submit(()->{
        System.out.println("do something with message: "+msg);
    });
    } catch (final IOException e) {
        log.error("Unable to deserialize message: {}", e.getMessage(), e);
        return;
    }

    try {
        //Do my stuff
    } catch (Exception e) {
        log.error("Unable to get service from factory: {}", e.getMessage(), e);
    }

}

推荐阅读