spring-batch - JMSItemReader 根据块大小重新连接到 Solace
问题描述
我有连接到 Solace 队列的 Spring 批处理应用程序,只要消息在队列中或达到接收器超时,就会轮询记录。我正在使用基于块的处理但我可以看到我的批处理应用程序为它尝试处理的每个块重新连接到 Solace。就我的理解而言,与 Solace 的连接应该只发生一次,并且 SolSession 应该根据定义的块大小开始/结束。我的理解是正确的还是这是预期的行为?
Spring批处理应用日志
2021-04-01 11:54:17.033 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connecting to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (host 1 of 1, smfclient 1, attempt 1 of 1, this_host_attempt: 1 of 1)
2021-04-01 11:54:17.111 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (smfclient 1)
2021-04-01 11:54:17.158 INFO 11144 --- [ main] com.solacesystems.jms.SolSession : SolSession started.
Received data of size 100
2021-04-01 11:54:24.122 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Channel Closed (smfclient 1)
2021-04-01 11:54:24.122 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connecting to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (host 1 of 1, smfclient 2, attempt 1 of 1, this_host_attempt: 1 of 1)
2021-04-01 11:54:24.187 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (smfclient 2)
2021-04-01 11:54:24.219 INFO 11144 --- [ main] com.solacesystems.jms.SolSession : SolSession started.
Received data of size 100
2021-04-01 11:54:31.036 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Channel Closed (smfclient 2)
2021-04-01 11:54:31.036 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connecting to host 'orig=tcp://tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (host 1 of 1, smfclient 3, attempt 1 of 1, this_host_attempt: 1 of 1)
2021-04-01 11:54:31.098 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcp://tcp://localhost:55555, scheme=tcp://, host=localhost, port=55555' (smfclient 3)
2021-04-01 11:54:31.145 INFO 11144 --- [ main] com.solacesystems.jms.SolSession : SolSession started.
下面是我的 Spring 批处理配置类:
@EnableJms
@Configuration
@EnableBatchProcessing
public class SpringBatchJmsConfig {
public static final Logger logger=LoggerFactory.getLogger(SpringBatchJmsConfig.class.getName());
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
public JobBuilderFactory jobBuilFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public ConnectionFactory connectionFactory;
@Autowired
public CustomItemWriter writer;
@Bean
public DefaultJmsListenerContainerFactory cFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setPubSubDomain(false);
return factory;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter=new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public JmsItemReader<HcpSample1> hcpJmsItemReader(MessageConverter messageConverter){
JmsItemReader<HcpSample1> hcpJmsItemReader=new JmsItemReader();
hcpJmsItemReader.setJmsTemplate(jmsTemplate);
hcpJmsItemReader.setItemType(HcpSample1.class);
return hcpJmsItemReader;
}
@Bean
public FlatFileItemWriter<HcpSample1> hcpFlatFileItemWriter(){
FlatFileItemWriter<HcpSample1> hcpFlatFileItemWriter=new FlatFileItemWriter();
hcpFlatFileItemWriter.setLineAggregator(hcp->hcp.toString());
hcpFlatFileItemWriter.setLineSeparator(System.lineSeparator());
hcpFlatFileItemWriter.setResource(new FileSystemResource("hcp.txt"));
return hcpFlatFileItemWriter;
}
@Bean
public Job readJmsAndWriteToFileJob() {
return jobBuilFactory.get("readJmsAndWriteToFileJob").flow(step1()).end().build();
}
private Step step1() {
return stepBuilderFactory.get("step1").<HcpSample1,HcpSample1>chunk(100).
reader(hcpJmsItemReader(messageConverter())).writer(writer).build();
}
}
CustomItemWriter
:
@Component
public class CustomItemWriter implements ItemWriter<HcpSample1> {
@Override
public void write(List<? extends HcpSample1> items) throws Exception {
System.out.println("Received data of size " +items.size());
}
}
application.properties
:
solace.jms.host=tcp://localhost:55555
solace.jms.msg-vpn=DevVPN
solace.jms.client-username=test
solace.jms.client-password=test
spring.jms.template.default-destination=SpringBatchTestQueue
spring.jms.template.receive-timeout=2s
logging.level.com.solacesystems=INFO
注意我正在使用 Spring 批处理以及solace-spring-boot-starter
Solace 自动配置。
解决方案
JmsItemreader 不会为每个块重新打开 JMS 连接。它将为整个作业使用相同的连接。如果项目读取器在超时后返回 null,则作业将完成,此时连接已关闭。
是的..我的用例是每小时运行一次这项工作并处理所有消息
在这种情况下,您会看到每个计划运行的新连接(即新作业执行)是正常的。
推荐阅读
- ios - 如何为多个视图控制器使用 SWRevealviewcontroller 库
- c# - C#按键结束图像移动
- php - laravel 将用作函数返回类型的类放在哪里
- java - 查找子数据中的子数据是否存在 Android Firebase
- ssas - SSAS Tabular Cube Reload(似乎需要用户触发数据表格磁盘的加载)
- php - PHP/Laravel 如何将一个函数包含到另一个函数中?
- algorithm - 找到 2 个数组列表之间的交集的有效算法/方法是什么。(我使用的是 Java 8)
- android - AndroidX Room 未解析的超类型 RoomDatabase
- c# - 如何在 CODE_128 中使用像 CODEA 这样的特殊字符
- android - Android Dynamically created Calendar disable future Dates