首页 > 解决方案 > JMSItemReader 根据块大小重新连接到 Solace

问题描述

我有连接到 Solace 队列的 Spring 批处理应用程序,只要消息在队列中或达到接收器超时,就会轮询记录。我正在使用基于块的处理但我可以看到我的批处理应用程序为它尝试处理的每个块重新连接到 Solace。就我的理解而言,与 Solace 的连接应该只发生一次,并且 SolSession 应该根据定义的块大小开始/结束。我的理解是正确的还是这是预期的行为?

Spring批处理应用日志

2021-04-01 11:54:17.033  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connecting to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (host 1 of 1, smfclient 1, attempt 1 of 1, this_host_attempt: 1 of 1)
2021-04-01 11:54:17.111  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (smfclient 1)
2021-04-01 11:54:17.158  INFO 11144 --- [           main] com.solacesystems.jms.SolSession         : SolSession started.
Received data of size 100
2021-04-01 11:54:24.122  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Channel Closed (smfclient 1)
2021-04-01 11:54:24.122  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connecting to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (host 1 of 1, smfclient 2, attempt 1 of 1, this_host_attempt: 1 of 1)
2021-04-01 11:54:24.187  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (smfclient 2)
2021-04-01 11:54:24.219  INFO 11144 --- [           main] com.solacesystems.jms.SolSession         : SolSession started.
Received data of size 100
2021-04-01 11:54:31.036  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Channel Closed (smfclient 2)
2021-04-01 11:54:31.036  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connecting to host 'orig=tcp://tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (host 1 of 1, smfclient 3, attempt 1 of 1, this_host_attempt: 1 of 1)
2021-04-01 11:54:31.098  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcp://tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (smfclient 3)
2021-04-01 11:54:31.145  INFO 11144 --- [           main] com.solacesystems.jms.SolSession         : SolSession started.

下面是我的 Spring 批处理配置类:

@EnableJms
@Configuration
@EnableBatchProcessing
public class SpringBatchJmsConfig {
    
    
    public static final Logger logger=LoggerFactory.getLogger(SpringBatchJmsConfig.class.getName());
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Autowired
    public JobBuilderFactory jobBuilFactory;
    
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    
    @Autowired
    public ConnectionFactory connectionFactory;
    
    @Autowired
    public CustomItemWriter writer;
    
    
    @Bean
    public DefaultJmsListenerContainerFactory cFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setPubSubDomain(false);
        return factory;
        
    }
    
    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter=new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }
    
    
    
    
    @Bean
    public JmsItemReader<HcpSample1> hcpJmsItemReader(MessageConverter messageConverter){
        
        JmsItemReader<HcpSample1> hcpJmsItemReader=new JmsItemReader();
        hcpJmsItemReader.setJmsTemplate(jmsTemplate);
        hcpJmsItemReader.setItemType(HcpSample1.class);
        return hcpJmsItemReader;
    }
    
    
    @Bean
    public FlatFileItemWriter<HcpSample1> hcpFlatFileItemWriter(){
        FlatFileItemWriter<HcpSample1> hcpFlatFileItemWriter=new FlatFileItemWriter();
        hcpFlatFileItemWriter.setLineAggregator(hcp->hcp.toString());
        hcpFlatFileItemWriter.setLineSeparator(System.lineSeparator());
        hcpFlatFileItemWriter.setResource(new FileSystemResource("hcp.txt"));
        return hcpFlatFileItemWriter;
        
    }
    
    
    @Bean
    public Job readJmsAndWriteToFileJob() {
        
        return jobBuilFactory.get("readJmsAndWriteToFileJob").flow(step1()).end().build();
    }
    
    
    private Step step1() {
        return stepBuilderFactory.get("step1").<HcpSample1,HcpSample1>chunk(100).
                reader(hcpJmsItemReader(messageConverter())).writer(writer).build();
    }
    

}

CustomItemWriter

@Component
public class CustomItemWriter implements ItemWriter<HcpSample1> {

    @Override
    public void write(List<? extends HcpSample1> items) throws Exception {
        System.out.println("Received data of size " +items.size());
        
    }

}

application.properties

solace.jms.host=tcp://localhost:55555
solace.jms.msg-vpn=DevVPN
solace.jms.client-username=test
solace.jms.client-password=test
spring.jms.template.default-destination=SpringBatchTestQueue
spring.jms.template.receive-timeout=2s
logging.level.com.solacesystems=INFO

注意我正在使用 Spring 批处理以及solace-spring-boot-starterSolace 自动配置。

标签: spring-batchspring-jmssolace

解决方案


JmsItemreader 不会为每个块重新打开 JMS 连接。它将为整个作业使用相同的连接。如果项目读取器在超时后返回 null,则作业将完成,此时连接已关闭。

是的..我的用例是每小时运行一次这项工作并处理所有消息

在这种情况下,您会看到每个计划运行的新连接(即新作业执行)是正常的。


推荐阅读