split - Camel RoutingSlip 需要在 Split 中使用 end()
问题描述
当使用拆分器和路由单将主体部分路由到不同的端点时,我发现.end()
需要一个以避免包含拆分块之外的任何内容。
所需的行为是拆分主体,使用路由表将每个部分路由到不同的端点。拆分块完成后,继续处理交换(和正文),就像拆分前一样。
测试代码有两条相同的路线,.end()
除了.routingSlip()
. 当测试运行时,您可以看到带有.end()
3 个内部处理器消息和一个外部处理器消息的那个。拆分块完成后,它还将具有正确的有效负载类型。而另一个使用没有 after 的第二条路由的测试将.end()
产生3routingSlip()
个交错的内部和外部处理器消息。
虽然我可能错过了文档中的某些内容,但我找不到任何以这种方式使用拆分器和 routingSlip 的示例,这会警告我我需要.end()
让它按照我的意图运行。如果这不是一个错误,我会建议这个问题的更明显的文档。我可能会更早找到它,但我的原始代码涉及一个自定义拆分器,这并不明显是问题所在,而不是我的代码。
我也不知道这个问题是否也适用于收件人列表或动态路由器。
package org.apache.camel.processor.routingslip;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class SpliterRoutingSlipTest extends CamelTestSupport {
private static final String TEST_DIR = "target/test";
private static final String TEST_OUT_ENDPOINT_WEND = "file:"+TEST_DIR+"/Wend";
private static final String TEST_OUT_ENDPOINT_WOEND = "file:"+TEST_DIR+"/WOend";
private static final String TEST_ROUTE_ID_WEND = "splitBodyTestWEnd";
private static final String TEST_ROUTE_ID_WOEND = "splitBodyTestWOEnd";
private static final String TEST_IN_ENDPOINT_WEND = "direct:"+TEST_ROUTE_ID_WEND;
private static final String TEST_IN_ENDPOINT_WOEND = "direct:"+TEST_ROUTE_ID_WOEND;
private static final String TEST_ROUTING_SLIP_HEADER = "toEndpoint";
private static final List<String> TEST_BODY = Arrays.asList(new String[] {
"This is line 1",
"This is line 2",
"This is line 3",
});
@BeforeClass
public static void init() throws IOException {
File dirToRemove = new File(TEST_DIR);
if (dirToRemove.exists())
FileUtils.forceDelete(dirToRemove);
}
/**
* Test split and routing slip WITH an '.end()' after the routing slip.
*
* The result is that the Inner processor gets called for EACH iteration within the split
* but the Outer process only gets called after the split is complete AND the exchange
* is the one from before being split.
*
* This IS the desired behavior.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(1);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WEND);
assertMockEndpointsSatisfied();
}
/**
* Test split and routing slip WITH OUT an '.end()' after the routing slip.
*
* The result is that the inner and outer processors BOTH get called for EACH iteration within the split.
*
* This is NOT the desired effect.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithIncorrectOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(3);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WOEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WOEND);
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(TEST_IN_ENDPOINT_WEND).id(TEST_ROUTE_ID_WEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor w/ end().");
Message in = exchange.getIn();
System.out.println("\tin="+in);
Object body = in.getBody();
System.out.println("\tbody="+body);
System.out.println("\tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
.end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor w/ end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
from(TEST_IN_ENDPOINT_WOEND).id(TEST_ROUTE_ID_WOEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor W/O end().");
Message in = exchange.getIn();
System.out.println("\tin="+in);
Object body = in.getBody();
System.out.println("\tbody="+body);
System.out.println("\tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WOEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
// .end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor W/O end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
}
};
}
}
解决方案
我同意你的意见。我已经在 Camel 中开发超过 5 年了,并且.end()
与 . endChoice()
仍然是最令人困惑的东西之一:-(
我的建议是:
1)想想你的路由将如何在 Spring DSL 中表达。在这个基于 xml 的 DSL 中,您总是必须分隔您的块(带有结束标记)
<from uri="direct:a"/>
<routingSlip ignoreInvalidEndpoints="true"/> <!-- START OF BLOCK -->
<header>myHeader</header>
</routingSlip> <!-- END OF BLOCK -->
</route>
在 Java 中做同样的事情!
2)事实(和令人困惑的部分)是,对于琐碎的处理(=您在教程/非现实生活中的 Camel 示例中总是看到的),Java-DSL 允许省略结束块:
from("direct:a")
.routingSlip(header("myHeader"))
.ignoreInvalidEndpoints();
但正确的做法是:
from("direct:a")
.routingSlip(header("myHeader"))
.ignoreInvalidEndpoints()
.end();
3)我的recipentList问题和你一样,也需要关闭!
.split(simple("${body}"))
.streaming()
.aggregate(simple("${body.blockId}"), new PutInBlockStrategy())
.ignoreInvalidCorrelationKeys()
.completionTimeout(5*1000)
.log(TRACE, LOGNAME, "Next block:\n${body}")
.recipientList( method(this, "getRecipents") ).end()
.parallelProcessing()
.end()
.end()
.log(INFO, LOGNAME, "File: ${headers.CamelFileName} successfully processed");
4)如有疑问,请查看EIP模式定义的source或javadoc ,看看它是否有明确的end()方法:
https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RoutingSlipDefinition.html#end() https://camel.apache.org/maven /camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RecipientListDefinition.html#end()
如果是这样,请始终 end() 您的块!
5) 有趣的帖子: https ://www.3riverdev.com/apache-camel-tips-caveats-from-the-trenches/
推荐阅读
- html - Bootstrap4.1 导航栏垂直项目而不是水平相册示例
- hyperledger-fabric - 在将 Hyperledger composer 部署到多个组织的结构中时,我需要粘贴 org1 和 org2 的 CA 证书
- web-scraping - Scrapy 空 xpath 响应
- laravel - 带有特殊符号 URL 的 HTTP 500 内部服务器错误
- ocr - 如何使用 CreateML 正确训练 OCR 模型?我的不仅不好,而且毫无价值
- javascript - 使用 Jquery 或 Vanilla Js 自定义 CSS 步进器的问题
- gephi - gephi中2个节点之间的所有可能路径
- python - 通过 DRF 视图和序列化程序传递时,ManyToManyField 被过滤排除
- java - Dijkstra 的算法可以适用于旅行商问题吗?
- javascript - React-redux:无法使用连接将商店作为道具获取,无法读取未定义错误的属性“道具”