首页 > 解决方案 > Spring 集成:拆分器-聚合器流中错误处理的意外行为

问题描述

我有以下失败的测试。问题是为什么在拆分的“子消息”之一出现错误时回复只是错误,而另一个成功处理的子消息没有结果(如测试中预期的那样)?是否对此代码进行了修改以实现测试中的预期结果?

@RunWith(SpringRunner.class)
public class ErrorHandlingTests {

    @Autowired
    StringsService stringsService;

    interface StringsService {
        @Nonnull
        List<String> process(@Nonnull List<String> data);
    }

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel")
                    .<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow2() {
            AtomicInteger incomingCorrelationId = new AtomicInteger();

            return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
                    .split(new AbstractMessageSplitter() {
                        @Override
                        protected Object splitMessage(Message<?> message) {
                            List<String> strings = (List<String>) message.getPayload();
                            int id = incomingCorrelationId.get();
                            return strings
                                    .stream()
                                    .map(r -> MessageBuilder
                                            .withPayload(r)
                                            .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, id)
                                            .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, strings.size())
                                            .build())
                                    .collect(Collectors.toList());
                        }
                    })
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");

                            return "R: " + s;
                        }
                    })
                    .aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
                        @Override
                        protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
                            return group.getMessages()
                                    .stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.toList());
                        }
                    }))
                    .get();
        }
    }

    @Test
    public void testErrorHandlingInFlow2() {
        assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
        assertEquals(Arrays.asList("R: a", "Failure for oops"), stringsService.process(Arrays.asList("a", "oops")));
    }
}

更新版本,工作,应用建议。

@RunWith(SpringRunner.class)
public class ErrorHandlingTests2 {

    interface StringsService {
        @Nonnull
        List<String> process(@Nonnull List<String> data);
    }

    @Autowired
    StringsService stringsService;

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel")
                    .<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow2() {
            return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
                    .split(new AbstractMessageSplitter() {
                        @Override
                        protected Object splitMessage(Message<?> message) {
                            List<String> strings = (List<String>) message.getPayload();
                            return strings
                                    .stream()
                                    .map(r -> MessageBuilder
                                            .withPayload(r)
                                            .build())
                                    .collect(Collectors.toList());
                        }
                    })
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");

                            return "R: " + s;
                        }
                    }, c -> c.advice(advice()))
                    .aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
                        @Override
                        protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
                            return group.getMessages()
                                    .stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.toList());
                        }
                    }))
                    .get();
        }

        @Bean
        ExpressionEvaluatingRequestHandlerAdvice advice() {
            ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
            advice.setReturnFailureExpressionResult(true);
            advice.setOnFailureExpression(
                    new FunctionExpression<Message<?>>(s ->
                            MessageBuilder
                                    .withPayload("Failure for " + s.getPayload())
                                    .copyHeaders(s.getHeaders()).build())
            );
            return advice;
        }
    }

    @Test
    public void testErrorHandlingInFlow2() {
        assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
        assertEquals(Arrays.asList("R: a", "Failure for oops", "R: b"), stringsService.process(Arrays.asList("a", "oops", "b")));
    }

}

标签: spring-integrationspring-integration-dsl

解决方案


  1. 拆分器确实支持 Java Stream
  2. 默认情况下,拆分器填充了这些标题。不知道为什么需要自定义IntegrationMessageHeaderAccessor.CORRELATION_ID。在拆分器中是这样的:final Object correlationId = message.getHeaders().getId();
  3. transform(). 因此,当 transform 抛出异常时,实际上是它没有到​​达聚合器。事实上,两人只是彼此不了解。这是 Spring Integration 中的一等公民特性之一,其中端点与中间的消息通道松散耦合。你可以从不同的地方向同一个端点发送消息。无论如何,我想即使使用普通的 Java,它的行为也会相同:你有一个循环,并在循环结束时将数据收集到一个列表中。现在成像你在收集逻辑之前失败了,所以异常被抛出给被调用者,并且确实没有任何关于它是如何收集的信息。

现在关于您预期逻辑的可能修复。请看一下ExpressionEvaluatingRequestHandlerAdvice要应用于您的变压器的。因此,当您遇到异常时,它会在failureChannel子流程中进行处理,并作为常规回复返回补偿消息,以便与其他人汇总。有关更多信息,请参阅文档:https ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain


推荐阅读