首页 > 解决方案 > Redis 命令超时;嵌套异常是 io.lettuce.core.RedisCommandTimeoutException: 命令在 1 分钟后超时

问题描述

今天,当我使用 Java redis client( spring-data-redis-2.3.9.RELEASE.jar) 来消费 Redis 流消息时,遇到了这个错误:

2021-05-11 17:49:42.134 ERROR 26301 --- [-post-service-1] c.d.s.p.common.mq.StreamConsumerRunner   : pull message error

org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:70) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:275) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:712) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:602) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:591) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:310) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:376) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:371) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:305) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    at misc.config.async.pool.MdcTaskDecorator.lambda$decorate$0(MdcTaskDecorator.java:29) ~[dolphin-common-1.0.0-SNAPSHOT.jar!/:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
    at io.lettuce.core.internal.ExceptionFactory.createTimeoutException(ExceptionFactory.java:53) ~[lettuce-core-6.1.1.RELEASE.jar!/:6.1.1.RELEASE]
    at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:246) ~[lettuce-core-6.1.1.RELEASE.jar!/:6.1.1.RELEASE]
    at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75) ~[lettuce-core-6.1.1.RELEASE.jar!/:6.1.1.RELEASE]
    at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80) ~[lettuce-core-6.1.1.RELEASE.jar!/:6.1.1.RELEASE]
    at com.sun.proxy.$Proxy149.xreadgroup(Unknown Source) ~[na:na]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:600) ~[spring-data-redis-2.3.9.RELEASE.jar!/:2.3.9.RELEASE]
    ... 15 common frames omitted

一开始消息可以被客户端成功消费,有些消息可以被正确消费和处理,过一段时间就会出现超时问题。这是我的代码:

package com.dolphin.soa.post.common.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

import java.time.Duration;

/**
 * @author dolphin
 */
@Component
@Slf4j
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {

    @Value("${dolphin.redis.stream.consumer}")
    private String consumer;

    @Value("${dolphin.redis.stream.group}")
    private String groupName;

    private final RedisConnectionFactory redisConnectionFactory;

    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;

    private final StreamMessageListener streamMessageListener;

    private final StringRedisTemplate stringRedisTemplate;

    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

    public StreamConsumerRunner(RedisConnectionFactory redisConnectionFactory,
                                ThreadPoolTaskExecutor threadPoolTaskExecutor,
                                StreamMessageListener streamMessageListener,
                                StringRedisTemplate stringRedisTemplate) {
        this.redisConnectionFactory = redisConnectionFactory;
        this.threadPoolTaskExecutor = threadPoolTaskExecutor;
        this.streamMessageListener = streamMessageListener;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void run(ApplicationArguments args) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .batchSize(1)
                .executor(this.threadPoolTaskExecutor)
                .errorHandler(new ErrorHandler() {
                    @Override
                    public void handleError(Throwable t) {
                        log.error("pull message error",t);
                    }
                })
                .pollTimeout(Duration.ofMinutes(1L))
                .serializer(new StringRedisSerializer())
                .build();
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
                .create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
        streamMessageListenerContainer.receive(Consumer.from(groupName, consumer),
                StreamOffset.create(consumer, ReadOffset.lastConsumed()), this.streamMessageListener);
        this.streamMessageListenerContainer = streamMessageListenerContainer;
        this.streamMessageListenerContainer.start();
    }

    @Override
    public void destroy() throws Exception {
        this.streamMessageListenerContainer.stop();
    }
}

这是StreamMessageListener

package com.dolphin.soa.post.common.mq;

import com.dolphin.soa.post.contract.request.ArticleRequest;
import com.dolphin.soa.post.model.entity.SubRelation;
import com.dolphin.soa.post.service.IArticleService;
import com.dolphin.soa.post.service.ISubRelationService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

/**
 * @author dolphin
 */
@Component
@Slf4j
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Value("${dolphin.redis.stream.group}")
    private String groupName;

    @Value("${dolphin.redis.user.sub.article.key}")
    private String subArticleKey;

    private final StringRedisTemplate stringRedisTemplate;

    private final RedisTemplate<String, Object> articleRedisTemplate;

    private final RedisTemplate<String, Long> redisLongTemplate;

    private final ISubRelationService subRelationService;

    private final IArticleService articleService;

    public StreamMessageListener(StringRedisTemplate stringRedisTemplate,
                                 @Qualifier("redisObjectTemplate") RedisTemplate<String, Object> articleRedisTemplate,
                                 ISubRelationService subRelationService,
                                 @Qualifier("redisLongTemplate") RedisTemplate<String, Long> redisLongTemplate,
                                 IArticleService articleService) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.articleRedisTemplate = articleRedisTemplate;
        this.subRelationService = subRelationService;
        this.redisLongTemplate = redisLongTemplate;
        this.articleService = articleService;
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            RecordId messageId = message.getId();
            Map<String, String> body = message.getValue();
            log.info("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body);
            handleArticle(body);
            this.stringRedisTemplate.opsForStream().acknowledge(groupName, message);
        } catch (Exception e) {
            log.error("error", e);
        }
    }
}

我正在从互联网上搜索,并没有找到有用的建议。我尝试将生菜包版本升级到最新6.1.1.RELEASE版本,但仍未修复。问题出在哪里,我应该怎么做才能使其按预期工作?我也尝试替换lettucejedis

api ("org.springframework.boot:spring-boot-starter-data-redis") {
            exclude group: "io.lettuce", module: "lettuce-core"
}
api "redis.clients:jedis:3.6.0"

似乎没有用。

标签: javaredis

解决方案


我尝试使用不使用生菜的绝地来解决这个问题。

第 1 步:排除生菜:

api ("org.springframework.boot:spring-boot-starter-data-redis") {
            exclude group: "io.lettuce", module: "lettuce-core"
            exclude group: "org.springframework.data", module: "spring-data-redis"
        }

因为 spring-boot-starter-data-redis 包含两者jedislettuce如果排除lettuce,则默认连接应使用jedis.


推荐阅读