首页 > 解决方案 > Redisson scheduleWithFixedDelay 作业未在配置的时间运行

问题描述

背景:

Redisson 版本: 3.15.0 框架: Spring Boot Redis 实例类型:集群模式(Azure Cache Premium with 2 nodes)

我正在使用 redisson 的 scheduleWithFixedDelay API 来安排两种延迟作业:

每 2 分钟重复一次,初始延迟为 5 分钟,最大执行次数为 5 次 每 2 分钟重复一次,初始延迟为 15 分钟,最大执行次数为 10 次 我已经使用 RAtomicLong 数据类型实现了最大执行次数的逻辑.

问题:

当我使用 executor.scheduleWithFixedDelay API 进行调度时,作业不会以配置的预期初始延迟执行。为了添加更多上下文,我生成了多个延迟作业类型的实例,并且没有一个以预期的延迟(5 分钟或 15 分钟)运行。事实上,它们都在第二天大约在同一时间运行,比它们的初始创建时间延迟了约 8 小时。

另一方面,我使用 executor.execute API 安排的作业运行良好,没有任何问题。问题仅发生在 scheduleWithFixedDelay API 上。

下面是一些示例源代码,大致展示了延迟作业的外观以及 redissonClient / redisson 节点创建代码。

非常感谢任何帮助:)

代码

延迟的JobCreator

@Service
public class DelayedJobCreator {
    @Autowired
    private final RedissonClient redissonClient;

    public void createTypeOne(DelayedJobData jobData) {
        Long initialDelay = 300 L;
        Long repeatedDelay = 120 L;
        RScheduledExecutorService executor = redissonClient
                             .getExecutorService("delayed-job-service");
        executor.scheduleWithFixedDelay(
                             new TypeOneDelayedJob(jobData), initialDelay, repeatedDelay, TimeUnit.SECONDS
                 );
    }

    public void createTypeTwo(DelayedJobData jobData) {
        Long initialDelay = 900 L;
        Long repeatedDelay = 120 L;
        RScheduledExecutorService executor = redissonClient
                             .getExecutorService("delayed-job-service");
        executor.scheduleWithFixedDelay(
                             new TypeTwoDelayedJob(jobData), initialDelay, repeatedDelay, TimeUnit.SECONDS
                 );
    }
}

TypeOneDelayedJob

@Slf4j
@Component
public class TypeOneDelayedJob implements Runnable, Serializable {
    private Long maxCount = 5;
    private Long maxDelay = 1800000; // 30 mins
    DelayedJobData jobData;

    @RInject
    RedissonClient redissonClient;

    @RInject
    String taskId;
    @Override
    public void run() {
        RAtomicLong count = redissonClient.getAtomicLong("countTypeOne:" + taskId);
        Long newValue = count.incrementAndGet();
        Long createdEpoch = jobData.getEpoch();
        Long currentEpoch = System.currentTimeMillis();

        RScheduledExecutorService executor = redissonClient.getExecutorService("delayed-job-service");
        if (newValue > maxCount) {
            log.info("Maximum retries hit for TypeOneDelayedJob with taskId: {}", taskId);
            executor.cancelTask(taskId);
        } else if (currentEpoch - createdEpoch > maxDelay) {
            log.info("Maximum delay TypeOneDelayedJob with taskId: {}", taskId);
            executor.cancelTask(taskId);
        } else {
            // Job logic
        }
    }
}

TypeTwoDelayedJob

@Slf4j
@Component
public class TypeTwoDelayedJob implements Runnable, Serializable {
    private Long maxCount = 10;
    private Long maxDelay = 1800000; // 30 mins
    DelayedJobData jobData;

    @RInject
    RedissonClient redissonClient;

    @RInject
    String taskId;
    @Override
    public void run() {
        RAtomicLong count = redissonClient.getAtomicLong("countTypeOne:" + taskId);
        Long newValue = count.incrementAndGet();
        Long createdEpoch = jobData.getEpoch();
        Long currentEpoch = System.currentTimeMillis();

        RScheduledExecutorService executor = redissonClient.getExecutorService("delayed-job-service");
        if (newValue > maxCount) {
            log.info("Maximum retries hit for TypeTwoDelayedJob with taskId: {}", taskId);
            executor.cancelTask(taskId);
        } else if (currentEpoch - createdEpoch > maxDelay) {
            log.info("Maximum delay TypeTwoDelayedJob with taskId: {}", taskId);
            executor.cancelTask(taskId);
        } else {
            // Job logic
        }
    }
}

Spring bean 创建 - 配置类

@Bean
public Config redissonConfig() {
    Config config = new Config();
    config.useClusterServers()
        .addNodeAddress(host)
        .setPassword(password);
    return config;
}

@SneakyThrows
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient(Config config) {
    return Redisson.create(config);
}

@Bean(destroyMethod = "shutdown")
public RedissonNode redissonNode(Config config) {
    RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
    nodeConfig.setBeanFactory(beanFactory);
    nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("delayed-job-service", 2));
    RedissonNode node = RedissonNode.create(nodeConfig);
    node.start();
    return node;
}

迄今为止采取的步骤

我们注意到与 Redisson 连接超时相关的日志中很少出现 redisson 错误,即

错误 6 --- [isson-timer-4-1] orchandler.PingConnectionHandler:无法通过通道发送 PING 命令:[id: 0x6f24e6ea, L:/10.114.48.173:57092 - R:euw-prod-052-fps- ctwr-redis-cluster.redis.cache.windows.net/10.114.49.200:6380]

org.redisson.client.RedisTimeoutException:命令执行超时:(PING),参数:[],Redis客户端:[addr=redis://euw-prod-052-fps-ctwr-redis-cluster.redis.cache .windows.net:6380] 在 org.redisson.client.RedisConnection.lambda$async$1(RedisConnection.java:207) ~[redisson-3.15.0.jar!/:3.15.0] 在 io.netty.util。 HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672) [netty-common-4.1.58.Final.jar!/:4.1.58.Final] 在 io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java: 747) [netty-common-4.1.58.Final.jar!/:4.1.58.Final] 在 io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472) [netty-common-4.1.58 .Final.jar!/:4.1.58.Final] 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.58.Final.jar!/:4.1.58 .Final] 在 java.lang.Thread。run(Thread.java:748) [na:1.8.0_212] 我们认为这些错误导致 scheduleWithFixedDelay 无法正常工作,因此为了修复这些连接超时,我们为 redisson 客户端设置了以下参数:

pingConnectionInterval: 10000
keepAlive: true

在进行这些配置更改后,我们在 2 天内没有注意到这些超时,并且 scheduleWithFixedDelay API 确实按预期工作。但是,我们在 2 天后开始收到这些超时。

我想知道处理这种情况的正确方法是什么。此外,如前所述,这仅发生在 scheduleWithFixedDelay 用例而不是执行 API 上。为什么这只发生在 schedule API 而不是 execute API 上?

标签: javaspring-bootredisscheduledexecutorserviceredisson

解决方案


推荐阅读