首页 > 解决方案 > bean之间的spring集成发布订阅

问题描述

感谢您提前阅读。在我的主要方法中,我有一个 PublishSubscribeChannel

@Bean(name = "feeSchedule")
public SubscribableChannel getMessageChannel() {
    return new PublishSubscribeChannel();
}

在执行长时间运行过程的服务中,它会创建一个费用表,我将通道注入

@Service
public class FeeScheduleCompareServiceImpl implements FeeScheduleCompareService {

    @Autowired
    MessageChannel outChannel;

    public List<FeeScheduleUpdate> compareFeeSchedules(String oldStudyId) {
    List<FeeScheduleUpdate> sortedResultList = longMethod(oldStudyId);
    outChannel.send(MessageBuilder.withPayload(sortedResultList).build());
        return sortedResultList;
    } 
}

现在这是我正在努力的部分。我想使用可完成的未来并在另一个spring bean中获取未来A中事件的有效负载。我需要未来的 A 从消息中返回有效负载。我想创建一个 ServiceActivator 作为消息端点,但就像我说的,我需要它来为未来的 A 返回有效负载。

@org.springframework.stereotype.Service
public class SFCCCompareServiceImpl implements SFCCCompareService {
     @Autowired
    private SubscribableChannel outChannel;

     @Override
    public List<SFCCCompareDTO> compareSFCC(String state, int service){
    ArrayList<SFCCCompareDTO> returnList = new ArrayList<SFCCCompareDTO>();
    CompletableFuture<List<FeeScheduleUpdate>> fa =  CompletableFuture.supplyAsync( () ->
            {  //block A   WHAT GOES HERE?!?!
                    outChannel.subscribe()
            }
    );
    CompletableFuture<List<StateFeeCodeClassification>> fb =  CompletableFuture.supplyAsync( () ->
            {  
                  return this.stateFeeCodeClassificationRepository.findAll();       
            }
    );
    CompletableFuture<List<SFCCCompareDTO>> fc = fa.thenCombine(fb,(a,b) ->{
        //block C
        //get in this block when both A & B are complete
        Object theList = b.stream().forEach(new Consumer<StateFeeCodeClassification>() {
            @Override
            public void accept(StateFeeCodeClassification stateFeeCodeClassification) {
                a.stream().forEach(new Consumer<FeeScheduleUpdate>() {
                    @Override
                    public void accept(FeeScheduleUpdate feeScheduleUpdate) {
                        returnList new SFCCCompareDTO();
                    }
                });
            }
        }).collect(Collectors.toList());
        return theList;
    });
    fc.join();
    return returnList;
}

}

正在考虑会有一个服务激活器,例如:

@MessageEndpoint
public class UpdatesHandler implements MessageHandler{

    @ServiceActivator(requiresReply = "true")
    public List<FeeScheduleUpdate> getUpdates(Message m){
        return (List<FeeScheduleUpdate>) m.getPayload();
    }
}

标签: springspring-integration

解决方案


您的问题不清楚,但我会尽力为您提供一些信息。

Spring Integration 不提供CompletableFuture支持,但确实提供了异步处理和回复。

有关详细信息,请参阅异步网关。另请参阅Asynchronous Service Activator

outChannel.subscribe()顺便说一句,应该带有MessageHandler回调。


推荐阅读