首页 > 解决方案 > Spring Integration Aggregator 问题或错误配置

问题描述

我正在使用 Spring Integration 来管理一些计划任务,其中一个我选择在HttpRequestHandlingMessagingGateway最后使用带有聚合器的 a 来收集任务结果,如下所示。消息到达最后一个频道“ responseHttpChannel ”是否缺少任何内容?我不是第一次使用 Spring Integration 也不是 http 网关。但在这种情况下,我永远无法让聚合器工作。在这个请求-响应消息中执行一些其他任务。如果至少超时对我有用,那已经是胜利了。

@Configuration
@EnableIntegration
@EnableConfigurationProperties
@EnableAsync
@EnableScheduling
@EnableIntegrationGraphController(path = "/integration", allowedOrigins="*")
public class SysmatIntegrationConfiguration {

    private static final String MESSAGE_ORIGIN = "MESSAGE_ORIGIN";
    
    @Autowired
    IndexSchedulerService<IndexScheduler<?>> indexSchedulerService;

    @Autowired
    ApplicationContext applicationContext;

    @Autowired
    ListaTarefaItemHistorySplitter tarefaItemHistorySplitter; 

    @Autowired
    ListTarefaItemSpliter tarefaItemSplitter;

    @Autowired
    SelecionadorItem selecionadorItem;

    @Autowired  
    TarefaItemHistoryLog tarefaItemHistoryLog;

    @Autowired
    TarefaIndexarItemMaster tarefaIndexarItemMaster;
    
    public SysmatIntegrationConfiguration() {
    }

    @Bean
    public HeaderAttributeCorrelationStrategy headerAttributeCorrelationStrategy() {
        return new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
    }
    
    @Bean
    @Qualifier(value="expressionParser")
    ExpressionParser expressionParser() {
        return new SpelExpressionParser();
    }


    @Bean
    @Qualifier(value="indexTarefaTemplate")
    public MessagingTemplate indexTarefaTemplate(@Autowired @Qualifier("tarefaControllerChannel") MessageChannel indexChannel) { 
        return new MessagingTemplate(indexChannel);
    }

    @Bean
    @Primary
    @Qualifier(value="jobMessageTemplate")
    public MessagingTemplate messagingTemplate(@Autowired @Qualifier("indexChannel") MessageChannel indexChannel) { 
        MessagingTemplate messagingTemplate = new MessagingTemplate(indexChannel);
        return  messagingTemplate;
    }
    //schedulerMessageTemplate

    @Bean
    @Primary
    @Qualifier(value="schedulerMessageTemplate")
    public MessagingTemplate schedulerMessageTemplate(@Autowired @Qualifier("tarefaIndexSchedulerChannel") MessageChannel tarefaIndexSchedulerChannel) { 
        MessagingTemplate messagingTemplate = new MessagingTemplate(tarefaIndexSchedulerChannel);
        return  messagingTemplate;
    }
    
    @Bean(name="threadFactory")
    @Qualifier("threadFactory")
    public ThreadFactory highPriorityThreadFactory() { 
        return constructThread(Thread.NORM_PRIORITY , false); 
    }
    
    @Bean(name="normalPriorityThreadFactory")
    @Qualifier("normalPriorityThreadFactory")
    public ThreadFactory normalPriorityThreadFactory() { 
        return constructThread(Thread.NORM_PRIORITY , false); 
    }

    private ThreadFactory constructThread(final int priority , final Boolean daemon) { 
        return new ThreadFactory() { 
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(Optional.ofNullable(daemon).orElse(false));
                t.setPriority(priority);
                return t;
            }
        };
    }

    @Bean(name="threadPoolExecutor")
    @Qualifier(value="threadPoolExecutor")
    public ThreadPoolTaskExecutor threadPoolExecutor(@Autowired @Qualifier("threadFactory") ThreadFactory threadFactory) { 
        ThreadPoolTaskExecutor tpe =  new ThreadPoolTaskExecutor();
        tpe.setCorePoolSize(100);
        tpe.setMaxPoolSize(Integer.MAX_VALUE);
        tpe.setThreadFactory(threadFactory);
        return tpe;
    }
    
    @Bean(name="taskScheduler")
    @Qualifier(value="taskScheduler")
    public ThreadPoolTaskScheduler taskScheduler(@Autowired @Qualifier("normalPriorityThreadFactory") ThreadFactory threadFactory) { 
        ThreadPoolTaskScheduler tpe =  new ThreadPoolTaskScheduler();
        tpe.setThreadNamePrefix("IntegrationPoolTaskScheduler");
        tpe.setThreadFactory(threadFactory);
        tpe.setPoolSize(20);
        return tpe;
    }
    
    @Bean(name="threadPoolTaskScheduler")
    @Qualifier(value="threadPoolTaskScheduler")
    public ThreadPoolTaskScheduler threadPoolTaskScheduler(@Autowired @Qualifier("normalPriorityThreadFactory") ThreadFactory threadFactory) { 
        ThreadPoolTaskScheduler tpe =  new ThreadPoolTaskScheduler();
        tpe.setThreadNamePrefix("ThreadPoolTaskScheduler");
        tpe.setThreadFactory(threadFactory);
        tpe.setPoolSize(20);
        return tpe;
    }

    @Bean 
    @Qualifier(value="myTaskRegistar")
    public ScheduledTaskRegistrar myTaskRegistar(@Autowired @Qualifier("threadPoolTaskScheduler")TaskScheduler taskScheduler) {
        ScheduledTaskRegistrar scheduledTaskRegistrar = new ScheduledTaskRegistrar();
        scheduledTaskRegistrar.setTaskScheduler(taskScheduler);
        return scheduledTaskRegistrar;
    }
    
    
    @Primary
    @Bean("httpMessageConverters")
    public List<HttpMessageConverter<?>> httpMessageConverters(){ 
        List<HttpMessageConverter<?>> converters = new ArrayList<>();
        converters.add(new MappingJackson2HttpMessageConverter(new ObjectMapper()));
        converters.add(new FormHttpMessageConverter());
        converters.add(new StringHttpMessageConverter());
        return converters;
    }

    @Bean(name="defaultHttpHeaderMapper")
    @Qualifier(value="defaultHttpHeaderMapper")
    public DefaultHttpHeaderMapper defaultHeaderMapper() { 
        DefaultHttpHeaderMapper mapper = new DefaultHttpHeaderMapper();
        mapper.setInboundHeaderNames(new String[] {"*"});
        mapper.setOutboundHeaderNames(new String[] {"*"});
        return mapper;
    }

    @Bean
    @Qualifier("indexChannel")
    public MessageChannel indexChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("indexChannel" , threadPoolExecutor)
                .get();
        return psc;
    }

    @Bean(name="tarefaIndexChannel")
    @Qualifier(value="tarefaIndexChannel")
    public MessageChannel tarefaIndexChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("tarefaIndexChannel" , threadPoolExecutor)
                .get();
        return psc;
    }
    
    @Bean(name="tarefaIndexSchedulerChannel")
    @Qualifier(value="tarefaIndexSchedulerChannel")
    public MessageChannel tarefaIndexSchedulerChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("tarefaIndexSchedulerChannel" , threadPoolExecutor)
                .get();
        return psc;
    }

    @Bean(name="tarefaItemMasterIndexChannel")
    @Qualifier(value="tarefaItemMasterIndexChannel")
    public MessageChannel tarefaItemMasterIndexChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("tarefaIndexChannel" , threadPoolExecutor)
                .get();
        return psc;
    }

    @Bean
    @Qualifier("logginChannel")
    public DirectChannel loggingChannel() {
        return new DirectChannel();
    }

    @Bean(name="tarefaPartitionChannel")
    @Qualifier(value="tarefaPartitionChannel")
    public MessageChannel tarefaPartitionChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("tarefaPartitionChannel" , threadPoolExecutor)
                .get();
        return psc;
    }


    @Bean(name="tarefaControllerChannel")
    @Qualifier(value="tarefaControllerChannel")
    public MessageChannel tarefaControllerChannel() { 
        return new DirectChannel();
    }
    
    @Bean(name="tarefaHttpIntegrationChannel")
    @Qualifier(value="tarefaHttpIntegrationChannel")
    public MessageChannel tarefaHttpIntegrationChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("tarefaHttpIntegrationChannel" , threadPoolExecutor)
                .get();
        return psc; 
    }
    
    @Bean(name="tarefaHttpIndexIntegrationChannel")
    @Qualifier(value="tarefaHttpIndexIntegrationChannel")
    public MessageChannel tarefaHttpIndexIntegrationChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("tarefaHttpIndexIntegrationChannel" , threadPoolExecutor)
                .get();
        return psc; 
    }


    @Bean(name="sumarizaTarefaChannel")
    @Qualifier(value="sumarizaTarefaChannel")
    public MessageChannel sumarizaTarefaChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) { 
        PublishSubscribeChannel psc = MessageChannels
                .publishSubscribe("sumarizaTarefaChannel" , threadPoolExecutor)
                .get();
        return psc;
    }

    @Bean
    public IntegrationFlow loggingFlow(@Autowired 
            @Qualifier("loggingChannel") MessageChannel logChannel) {
        return  IntegrationFlows
                .from(logChannel)
                .handle(m -> logMessage(m))
                .get();
    }

    @Bean
    public IntegrationFlow indexPrepareFlow(@Autowired  @Qualifier("indexChannel") MessageChannel indexChannel , 
            @Autowired  @Qualifier("tarefaIndexChannel") MessageChannel tarefaIndexChannel) {
        return  IntegrationFlows
                .from(indexChannel)
                .transform(selecionadorItem)
                .channel("tarefaPartitionChannel").get();
    }

    @Bean
    public IntegrationFlow itemRouteToPartition(@Autowired HttpRequestHandler httpRequestHandler) {
        return IntegrationFlows
                .from("tarefaHttpIntegrationChannel")
                .enrichHeaders(h -> h.header(MESSAGE_ORIGIN, "HTTP"))
                .handle(httpRequestHandler)
                .get();
    }
    
    @Bean
    public IntegrationFlow indexRouteToPartition() {
        return IntegrationFlows
                .from("tarefaControllerChannel")
                .channel("tarefaPartitionChannel").get();

    }

    @Bean
    public IntegrationFlow indexSchedulerRouteToPartition(@Autowired TarefaIndexScheduler tarefaIndexScheduler) {
        return IntegrationFlows
                .from("tarefaIndexSchedulerChannel")
                .transform(tarefaIndexScheduler)
                .channel("tarefaPartitionChannel").get();

    }
    
    @Bean
    public IntegrationFlow indexSplitHistory(@Autowired IndexTaskMonitor indexTaskMonitor) {
        return IntegrationFlows
                .from("tarefaPartitionChannel")
                .wireTap(flow -> flow.handle(m->indexTaskMonitor.processIndexRequest(m.getPayload())))
                .split(tarefaItemHistorySplitter)
                .channel("tarefaIndexChannel").get();
    }

    @Bean
    public IntegrationFlow indexFlow(@Autowired  @Qualifier("tarefaIndexChannel") MessageChannel tarefaIndexChannel
            ,@Autowired @Qualifier("tarefaItemMasterIndexChannel") MessageChannel indexMasterChannel) {
        return IntegrationFlows
                .from(tarefaIndexChannel)
                .routeToRecipients(route ->
                                    route.recipientFlow(subFlow -> 
                                                        subFlow.transform(tarefaItemHistoryLog)
                                                        .split(tarefaItemSplitter)
                                                        .channel(indexMasterChannel)))
                .get();
    }


    @Bean
    public IntegrationFlow indexItemMasterFlow(@Autowired @Qualifier("tarefaItemMasterIndexChannel") MessageChannel indexMasterChannel,
            @Autowired @Qualifier("loggingChannel") MessageChannel logChannel ,
            @Autowired JmsMessageSender jmsSender) {
        return IntegrationFlows
                .from(indexMasterChannel)
                .transform(tarefaIndexarItemMaster)
                .channel(logChannel)
                .handle(m -> jmsSender.send(m.getPayload()))
                .get();
    }   
    
    @Bean
    public IntegrationFlow sumarizaIndexItemFlow() {

        return IntegrationFlows
                .from("sumarizaTarefaChannel")
                .wireTap(subflow -> 
                                                            subflow
                                                            .filter(new GenericSelector<Message<?>>() {
                                                                @Override
                                                                public boolean accept(Message<?> source) {
                                                                    return Optional
                                                                                    .ofNullable(source.getHeaders().get(MESSAGE_ORIGIN))
                                                                                    .map(h -> h.equals("HTTP")).orElse(false);
                                                                }
                                                            })
                                                            .channel(c -> c.direct("prehttpChannel")))
                .wireTap(subflow ->
                                                                    subflow
                                                                    .filter(new GenericSelector<Message<?>>() {
                                                                        @Override
                                                                        public boolean accept(Message<?> source) {
                                                                            return Optional
                                                                                            .ofNullable(source.getHeaders().get(MESSAGE_ORIGIN))
                                                                                            .map(h -> h.equals("SCHEDULER")).orElse(false);
                                                                        }
                                                                    })
                                                                    .channel(c -> c.direct("schedulerReplyChannel")))
                .get();

    }   
    
    @Bean
    public IntegrationFlow httpSchedulerRequestResponse() {
        return IntegrationFlows
                .from("schedulerReplyChannel")
                .channel("loggingChannel")
                .get();
    }
    
    @Bean
    public IntegrationFlow httpResponse() {
        return IntegrationFlows
                .from("prehttpChannel")
                .channel("responseHttpChannel")
                .channel("loggingChannel")
                .get();
    }
    
    @Bean
    public CrossOrigin crossOrigin() {
        CrossOrigin co = new CrossOrigin();
        co.setOrigin("*");
        co.setMethod(RequestMethod.GET , RequestMethod.POST);
        return co;
    }
    
    @Bean(name="itemRequestMapping")
    @Qualifier(value="itemRequestMapping")
    public RequestMapping itemRequestMapping() { 
        RequestMapping mapping = new RequestMapping();
        mapping.setMethods(HttpMethod.GET , HttpMethod.POST);
        mapping.setConsumes(MediaType.APPLICATION_JSON_VALUE);
        mapping.setProduces(MediaType.APPLICATION_JSON_VALUE);
        mapping.setPathPatterns("/int_sysmat/{id}");
        return mapping;
    }

    @Bean
    public HttpRequestHandlingMessagingGateway httpRequestGateway
    (@Autowired @Qualifier("tarefaHttpIntegrationChannel") 
    MessageChannel tarefaHttpIntegrationChannel , 
    @Autowired @Qualifier("responseHttpChannel") MessageChannel responseHttpChannel,
    @Autowired @Qualifier("expressionParser") ExpressionParser parser,
    @Autowired @Qualifier("itemRequestMapping") RequestMapping requestMapping,
    @Autowired CrossOrigin crossOrigin, 
    @Autowired List<HttpMessageConverter<?>> httpMessageConverters) {
        HttpRequestHandlingMessagingGateway g = new HttpRequestHandlingMessagingGateway();
        g.setRequestChannel(tarefaHttpIntegrationChannel);
        g.setReplyChannel(responseHttpChannel);
        g.setPayloadExpression(parser.parseExpression("#pathVariables.id"));
        g.setRequestMapping(requestMapping);
        g.setCrossOrigin(crossOrigin);
        g.setMessageConverters(httpMessageConverters);
        g.setReplyTimeout(180000);
        return g;
    }
    
    @Bean(name="indexIntegrationRepresentationRequestMapping")
    @Qualifier(value="indexIntegrationRepresentationRequestMapping")
    public RequestMapping indexRepresentationRequestMapping() { 
        RequestMapping mapping = new RequestMapping();
        mapping.setMethods(HttpMethod.GET , HttpMethod.POST);
        mapping.setConsumes(MediaType.APPLICATION_JSON_VALUE);
        mapping.setProduces(MediaType.APPLICATION_JSON_VALUE);
        mapping.setPathPatterns("/sysmat_index_integration");
        return mapping;
    }

    
    @Bean("httpSysmatIntegrationRequestGateway")
    @Qualifier(value="httpSysmatIntegrationRequestGateway")
    public HttpRequestHandlingMessagingGateway httpSysmatIntegrationRequestGateway
    (@Autowired @Qualifier("tarefaHttpIndexIntegrationChannel") MessageChannel tarefaHttpIndexIntegrationChannel , 
    @Autowired @Qualifier("expressionParser") ExpressionParser parser,
    @Autowired @Qualifier("indexIntegrationRepresentationRequestMapping") RequestMapping requestMapping,
    @Autowired CrossOrigin crossOrigin, 
    @Autowired List<HttpMessageConverter<?>> httpMessageConverters , 
    @Autowired @Qualifier("responseHttpChannel")  MessageChannel httpResponseChannel) {
        HttpRequestHandlingMessagingGateway g = new HttpRequestHandlingMessagingGateway();
        g.setRequestChannel(tarefaHttpIndexIntegrationChannel);
        g.setReplyChannel(httpResponseChannel);
        g.setExtractReplyPayload(false);
        g.setRequestPayloadTypeClass(IndexIntegrationRepresentation.class);
        g.setRequestMapping(requestMapping);
        g.setCrossOrigin(crossOrigin);
        g.setMessageConverters(httpMessageConverters);
        g.setReplyTimeout(180000);
        return g;
    }
    
    @Bean
    @Qualifier(value="responseHttpChannel")
    public MessageChannel responseHttpChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public IntegrationFlow tarefaHttpIndexIntegrationPartitionFlow(@Autowired TarefaIndexIntegrationHistory transformer) {
        return IntegrationFlows
                .from("tarefaHttpIndexIntegrationChannel")
                .handle(transformer)
                .get();

    }
    
    private Message<?> logMessage(Message<?> m) {
        System.err.println(m.getPayload());
        return m;
    }

}

任何线索我都会很高兴。

问候。

标签: javaspringspring-integrationspring-integration-dsl

解决方案


推荐阅读