spring - bean之间的spring集成发布订阅
问题描述
感谢您提前阅读。在我的主要方法中,我有一个 PublishSubscribeChannel
@Bean(name = "feeSchedule")
public SubscribableChannel getMessageChannel() {
return new PublishSubscribeChannel();
}
在执行长时间运行过程的服务中,它会创建一个费用表,我将通道注入
@Service
public class FeeScheduleCompareServiceImpl implements FeeScheduleCompareService {
@Autowired
MessageChannel outChannel;
public List<FeeScheduleUpdate> compareFeeSchedules(String oldStudyId) {
List<FeeScheduleUpdate> sortedResultList = longMethod(oldStudyId);
outChannel.send(MessageBuilder.withPayload(sortedResultList).build());
return sortedResultList;
}
}
现在这是我正在努力的部分。我想使用可完成的未来并在另一个spring bean中获取未来A中事件的有效负载。我需要未来的 A 从消息中返回有效负载。我想创建一个 ServiceActivator 作为消息端点,但就像我说的,我需要它来为未来的 A 返回有效负载。
@org.springframework.stereotype.Service
public class SFCCCompareServiceImpl implements SFCCCompareService {
@Autowired
private SubscribableChannel outChannel;
@Override
public List<SFCCCompareDTO> compareSFCC(String state, int service){
ArrayList<SFCCCompareDTO> returnList = new ArrayList<SFCCCompareDTO>();
CompletableFuture<List<FeeScheduleUpdate>> fa = CompletableFuture.supplyAsync( () ->
{ //block A WHAT GOES HERE?!?!
outChannel.subscribe()
}
);
CompletableFuture<List<StateFeeCodeClassification>> fb = CompletableFuture.supplyAsync( () ->
{
return this.stateFeeCodeClassificationRepository.findAll();
}
);
CompletableFuture<List<SFCCCompareDTO>> fc = fa.thenCombine(fb,(a,b) ->{
//block C
//get in this block when both A & B are complete
Object theList = b.stream().forEach(new Consumer<StateFeeCodeClassification>() {
@Override
public void accept(StateFeeCodeClassification stateFeeCodeClassification) {
a.stream().forEach(new Consumer<FeeScheduleUpdate>() {
@Override
public void accept(FeeScheduleUpdate feeScheduleUpdate) {
returnList new SFCCCompareDTO();
}
});
}
}).collect(Collectors.toList());
return theList;
});
fc.join();
return returnList;
}
}
正在考虑会有一个服务激活器,例如:
@MessageEndpoint
public class UpdatesHandler implements MessageHandler{
@ServiceActivator(requiresReply = "true")
public List<FeeScheduleUpdate> getUpdates(Message m){
return (List<FeeScheduleUpdate>) m.getPayload();
}
}
解决方案
您的问题不清楚,但我会尽力为您提供一些信息。
Spring Integration 不提供CompletableFuture
支持,但确实提供了异步处理和回复。
有关详细信息,请参阅异步网关。另请参阅Asynchronous Service Activator。
outChannel.subscribe()
顺便说一句,应该带有MessageHandler
回调。
推荐阅读
- python - 如何在 Python 中从 CSV 文件创建字典?
- flutter - 如何在另一个代码生成器的顶部运行代码生成器?
- ruby-on-rails - 在 Rails 中按日期获取下一个/上一个记录(并在同一天处理多个日期)
- php - 如何在laravel中的if返回变量中使用isset
- python - 无法将符号张量 (truediv_17:0) 转换为 numpy 数组
- authentication - Nightwatch.js:运行 .locateMultipleElements() 协议操作时出错:无效选择器:指定了无效或非法的选择器
- c# - 我可以在 VS 中构建这个项目,但不能使用 msbuild
- c# - 如何在 Windows 服务中使用谷歌日历 API?
- c - 光线追踪器光漫射的问题
- django - 当我展开我的 UserAdmin 时,我在管理面板中收到一个错误(__init__() got an unexpected keyword argument 'region')