spring-batch - 如何使用 Spring Batch Integration 在 IntegrationFlow 中添加自定义标头?
问题描述
我创建了一个 pollableChannel,它正在监听一个 S3 Bucket 获取文件并启动一个作业。
我的课是这样的:
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory s3SessionFactory) {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory);
synchronizer.setPreserveTimestamp(true);
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setRemoteDirectory(awsS3Properties.getCercBucket());
return synchronizer;
}
@Bean
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer s3InboundFileSynchronizer) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(
s3InboundFileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new FileSystemResource(integrationProperties.getTempDirectoryName()).getFile());
return messageSource;
}
@Bean("${receivable.integration.inChannel}")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway())
.get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(receivablePositionJob);
return fileMessageToJobRequest;
}
@Bean
@ServiceActivator(inputChannel = "${receivable.integration.inChannel}", poller = @Poller(fixedRate = "1000"))
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
jobLaunchingGateway.setOutputChannel(s3FilesChannel());
return jobLaunchingGateway;
}
我的 FileMessageToJobRequest 是这样的:
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
我想在消息中添加一个自定义消息头,或者我的第二个选项是在消息发布之前拦截上下文,因为我需要在 ThreadLocal 中设置我的租户。
我怎么能那样做?
提前致谢。
使用丰富标题更新:
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest())
.enrichHeaders(Map.of("teste", "testandio"))
.handle(jobLaunchingGateway())
.get();
}
解决方案
首先,您必须删除它,@ServiceActivator(inputChannel = "${receivable.integration.inChannel}"
因为它指向相同的s3FilesChannel
,它也是outputChannel
that的一个JobLaunchingGateway
。因此,您正在使用这种配置进行循环。完全不知道它对你有什么作用......
要在发送到 that 之前添加标头JobLaunchingGateway
,您只需在该定义中添加您的enrichHeaders()
之前。.handle(jobLaunchingGateway())
integrationFlow
推荐阅读
- css - ion-item 上的关键帧动画没有平滑过渡
- java - 如何从子键中获取值并根据其他子键中的键
- r - 在超过 5 个变量上应用 R 中的卡方检验并找到 p 值
- excel - 将多个系列添加到散点图的 VBA 代码 - 运行时 1004 错误
- javascript - 如何在功能组件中混合使用 useCallback 和 useRef
- amazon-web-services - 如何安排 AWS Lambda 在凌晨 12:00:00 准确运行(秒级精度)?
- mongodb - Does querying certain fields only save query time in mongodb?
- java - Passing values to java program from a command line, how does it work?
- c# - Binding DropdownList with a nested json result
- python - How to combine two LSTM layers with different input sizes in Keras?