首页 > 解决方案 > Camel RoutingSlip 需要在 Split 中使用 end()

问题描述

当使用拆分器和路由单将主体部分路由到不同的端点时,我发现.end()需要一个以避免包含拆分块之外的任何内容。

所需的行为是拆分主体,使用路由表将每个部分路由到不同的端点。拆分块完成后,继续处理交换(和正文),就像拆分前一样。

测试代码有两条相同的路线,.end()除了.routingSlip(). 当测试运行时,您可以看到带有.end()3 个内部处理器消息和一个外部处理器消息的那个。拆分块完成后,它还将具有正确的有效负载类型。而另一个使用没有 after 的第二条路由的测试.end()产生3routingSlip()个交错的内部外部处理器消息。

虽然我可能错过了文档中的某些内容,但我找不到任何以这种方式使用拆分器和 routingSlip 的示例,这会警告我我需要.end()让它按照我的意图运行。如果这不是一个错误,我会建议这个问题的更明显的文档。我可能会更早找到它,但我的原始代码涉及一个自定义拆分器,这并不明显是问题所在,而不是我的代码。

我也不知道这个问题是否也适用于收件人列表或动态路由器。

package org.apache.camel.processor.routingslip;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;

public class SpliterRoutingSlipTest extends CamelTestSupport {

    private static final String TEST_DIR = "target/test";
    private static final String TEST_OUT_ENDPOINT_WEND = "file:"+TEST_DIR+"/Wend";
    private static final String TEST_OUT_ENDPOINT_WOEND = "file:"+TEST_DIR+"/WOend";
    private static final String TEST_ROUTE_ID_WEND = "splitBodyTestWEnd";
    private static final String TEST_ROUTE_ID_WOEND = "splitBodyTestWOEnd";
    private static final String TEST_IN_ENDPOINT_WEND = "direct:"+TEST_ROUTE_ID_WEND;
    private static final String TEST_IN_ENDPOINT_WOEND = "direct:"+TEST_ROUTE_ID_WOEND;
    private static final String TEST_ROUTING_SLIP_HEADER = "toEndpoint";

    private static final List<String> TEST_BODY = Arrays.asList(new String[]  {
        "This is line 1",
        "This is line 2",
        "This is line 3",
    });

    @BeforeClass
    public static void init() throws IOException {
        File dirToRemove = new File(TEST_DIR);
        if (dirToRemove.exists())
            FileUtils.forceDelete(dirToRemove);
    }

    /**
     * Test split and routing slip WITH an '.end()' after the routing slip.
     * 
     * The result is that the Inner processor gets called for EACH iteration within the split
     * but the Outer process only gets called after the split is complete AND the exchange
     * is the one from before being split.
     * 
     * This IS the desired behavior.
     * 
     * @throws Exception
     */
    @Test
    public void testSplitByBodyAndRouteWithOuterPostProcessing() throws Exception {
        MockEndpoint end = getMockEndpoint("mock:end");
        end.expectedMessageCount(1);

        template.sendBodyAndHeader(TEST_IN_ENDPOINT_WEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WEND);

        assertMockEndpointsSatisfied();
    }

    /**
     * Test split and routing slip WITH OUT an '.end()' after the routing slip.
     * 
     * The result is that the inner and outer processors BOTH get called for EACH iteration within the split.
     * 
     * This is NOT the desired effect.
     * 
     * @throws Exception
     */
    @Test
    public void testSplitByBodyAndRouteWithIncorrectOuterPostProcessing() throws Exception {
        MockEndpoint end = getMockEndpoint("mock:end");
        end.expectedMessageCount(3);

        template.sendBodyAndHeader(TEST_IN_ENDPOINT_WOEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WOEND);

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(TEST_IN_ENDPOINT_WEND).id(TEST_ROUTE_ID_WEND)
                    .split(body())
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("This is the INNER processor w/ end().");
                                Message in = exchange.getIn();
                                System.out.println("\tin="+in);
                                Object body = in.getBody();
                                System.out.println("\tbody="+body);
                                System.out.println("\tbody.class="+body.getClass());
                            }
                        })
                        .setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WEND))
                        .setHeader("tempFileName", simple("${file:name}.tmp"))
                        .log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
                        .routingSlip(header(TEST_ROUTING_SLIP_HEADER))
                        .end()
                        .log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
                    .end()
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("This is the OUTER processor w/ end().");
                            Message in = exchange.getIn();
                            System.out.println("in="+in);
                            Object body = in.getBody();
                            System.out.println("body="+body);
                            System.out.println("body.class="+body.getClass());
                        }
                    })
                    .to("mock:end")
                .end()
                ;

                from(TEST_IN_ENDPOINT_WOEND).id(TEST_ROUTE_ID_WOEND)
                    .split(body())
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("This is the INNER processor W/O end().");
                                Message in = exchange.getIn();
                                System.out.println("\tin="+in);
                                Object body = in.getBody();
                                System.out.println("\tbody="+body);
                                System.out.println("\tbody.class="+body.getClass());
                            }
                        })
                        .setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WOEND))
                        .setHeader("tempFileName", simple("${file:name}.tmp"))
                        .log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
                        .routingSlip(header(TEST_ROUTING_SLIP_HEADER))
//                      .end()
                        .log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
                    .end()
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("This is the OUTER processor W/O end().");
                            Message in = exchange.getIn();
                            System.out.println("in="+in);
                            Object body = in.getBody();
                            System.out.println("body="+body);
                            System.out.println("body.class="+body.getClass());
                        }
                    })
                    .to("mock:end")
                .end()
                ;
            }
        };
    }
}

标签: splitapache-camelrouting-slip

解决方案


我同意你的意见。我已经在 Camel 中开发超过 5 年了,并且.end()与 . endChoice()仍然是最令人困惑的东西之一:-(

我的建议是:

1)想想你的路由将如何在 Spring DSL 中表达。在这个基于 xml 的 DSL 中,您总是必须分隔您的块(带有结束标记)

<from uri="direct:a"/>
    <routingSlip ignoreInvalidEndpoints="true"/> <!-- START OF BLOCK -->
        <header>myHeader</header>
    </routingSlip> <!-- END OF BLOCK -->
</route>

在 Java 中做同样的事情!

2)事实(和令人困惑的部分)是,对于琐碎的处理(=您在教程/非现实生活中的 Camel 示例中总是看到的),Java-DSL 允许省略结束块:

from("direct:a")
    .routingSlip(header("myHeader")) 
    .ignoreInvalidEndpoints();

但正确的做法是:

from("direct:a")
    .routingSlip(header("myHeader")) 
        .ignoreInvalidEndpoints()
    .end();

3)我的recipentList问题和你一样,也需要关闭!

.split(simple("${body}"))
        .streaming()
        .aggregate(simple("${body.blockId}"), new PutInBlockStrategy())
            .ignoreInvalidCorrelationKeys()
            .completionTimeout(5*1000)                                                          
            .log(TRACE, LOGNAME, "Next block:\n${body}")
            .recipientList( method(this, "getRecipents") ).end()
            .parallelProcessing()
        .end()  
    .end()
    .log(INFO, LOGNAME, "File: ${headers.CamelFileName} successfully processed");

4)如有疑问,请查看EIP模式定义的source或javadoc ,看看它是否有明确的end()方法:

https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RoutingSlipDefinition.html#end() https://camel.apache.org/maven /camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RecipientListDefinition.html#end()

如果是这样,请始终 end() 您的块!

5) 有趣的帖子: https ://www.3riverdev.com/apache-camel-tips-caveats-from-the-trenches/


推荐阅读