首页 > 解决方案 > How to stop polling after a message is received? Spring Integration

问题描述

I want to poll for a file in a directory and stop the polling once the file is found. I am very new to Spring framework and a lot of it still is very confusing. After doing some research, I found out a couple of ways of doing this but haven't any luck with any of them.

One of the ways is using a control bus as shown here. However, it just seems that the polling just stops after 2 seconds. I am not sure how to include the condition to stop only when a file is received.

Another way is to use "Smart Polling" as answered here. The link in the answer is old but it points to the official Spring docs here: Smart Polling. Through the article, I learned about AbstractMessageSourceAdvice and SimpleActiveIdleMessageSourceAdvice. The latter seems to suit my goal and would be the simplest to implement, so I decided to give that a go. My codes are as below:

IntegrationConfig.java

package com.example.springexample;

import java.io.File;

import org.aopalliance.aop.Advice;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.aop.SimpleActiveIdleMessageSourceAdvice;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.util.DynamicPeriodicTrigger;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
public class IntegrationConfig {

    @Bean
    public IntegrationFlow advised() {
        return IntegrationFlows.from("fileInputChannel")
                .handle("runBatchScript", "run", c -> c.advice(stopPollingAdvice()))
                .get();
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("."));
        source.setFilter(new SimplePatternFileListFilter("*.bat"));
        return source;
    }

    @Bean
    public RunBatchScript runBatchScript() {
        return new RunBatchScript();
    }

    @Bean
    public Advice stopPollingAdvice() {
        DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(10000);
        SimpleActiveIdleMessageSourceAdvice advice = new SimpleActiveIdleMessageSourceAdvice(trigger);
        advice.setActivePollPeriod(60000);
        return advice;
    }
}

RunBatchScript.java

package com.example.springexample;

import java.io.IOException;
import java.util.Date;
import java.util.logging.Logger;

public class RunBatchScript {

    Logger logger = Logger.getLogger(RunBatchScript.class.getName());

    public void run() throws IOException {
        logger.info("Running the batch script at " + new Date());
        Runtime.getRuntime().exec("cmd.exe /c simplebatchscript.bat");
        logger.info("Finished running the batch script at " + new Date());
    }
}

SpringExampleApplication.java

package com.example.springexample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringExampleApplication.class, args);
    }

}

I used this and this as the base for my codes. However, it doesn't seem to be working as the poller still polls every 1 second instead of the new 10 seconds or 60 seconds. Moreover, I am not sure how to actually stop the poller. I tried putting null into the constructor for SimpleActiveIdleMessageSource but it just returns NullPointerException.

The output when I run the application:

2020-03-15 13:57:46.081  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:46 SRET 2020
2020-03-15 13:57:46.084  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:46 SRET 2020
2020-03-15 13:57:47.085  INFO 37504 --- [ask-scheduler-2] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:47 SRET 2020
2020-03-15 13:57:47.087  INFO 37504 --- [ask-scheduler-2] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:47 SRET 2020
2020-03-15 13:57:48.089  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:48 SRET 2020
2020-03-15 13:57:48.092  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:48 SRET 2020
2020-03-15 13:57:49.093  INFO 37504 --- [ask-scheduler-3] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:49 SRET 2020
2020-03-15 13:57:49.096  INFO 37504 --- [ask-scheduler-3] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:49 SRET 2020

Any help with some code is greatly appreciated.

标签: springspring-integration

解决方案


You should apply SimpleActiveIdleMessageSourceAdvice to @InboundChannelAdapter. Also , the trigger of SimpleActiveIdleMessageSourceAdvice should be the same as the trigger that is used to poll the files:

    @Bean
    @EndpointId("fileInboundChannelAdapter")
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller("fileReadingMessageSourcePollerMetadata"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("."));
        source.setFilter(new SimplePatternFileListFilter("*.bat"));
        return source;
    }

    @Bean
    public PollerMetadata fileReadingMessageSourcePollerMetadata() {
        PollerMetadata meta = new PollerMetadata();

        DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(1000);

        SimpleActiveIdleMessageSourceAdvice advice = new SimpleActiveIdleMessageSourceAdvice(trigger);
        advice.setActivePollPeriod(60000);

        meta.setTrigger(trigger);
        meta.setAdviceChain(List.of(advice));
        meta.setMaxMessagesPerPoll(1);
        return meta;
    }

Please note that SimpleActiveIdleMessageSourceAdvice just change the next time to poll files. You can set it to a very large number such as several thousand years later which can somehow achieve your intention which never poll the file again in your lifetime. But the scheduler thread that poll the file still active.

If you really want to shut down this scheduler thread too, you can send a shut down signal to the control bus.

First define a control bus :

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("controlBus")
                  .controlBus()
                  .get();
    }

Then implements an AbstractMessageSourceAdvice that send a shutdown signal to the control bus after a file is polled :

@Service
public class StopPollingAdvice extends AbstractMessageSourceAdvice{

    @Lazy
    @Qualifier("controlBus")
    @Autowired
    private MessageChannel controlBusChannel;


    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return super.beforeReceive(source);
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
        Message operation = MessageBuilder.withPayload("@fileInboundChannelAdapter.stop()").build();
        controlBusChannel.send(operation);
        return result;
    }
}

and change the PollerMetadata that poll files to :

@Bean
public PollerMetadata fileReadingMessageSourcePollerMetadata(StopPollingAdvice stopPollingAdvice) {
    PollerMetadata meta = new PollerMetadata(); 
    meta.setTrigger(new PeriodicTrigger(1000));
    meta.setAdviceChain(List.of(stopPollingAdvice));
    meta.setMaxMessagesPerPoll(1);
    return meta;
}

推荐阅读