apache-camel - 使用骆驼 pollEnrich 合并 xml 输出
问题描述
从需要为 XSLT 处理器组合的各种源创建 XML 输出,但在使用 pollEnrich 时得到“端点上没有可用的消费者”。pollEnrich 聚合器未通过 pollEnrich Exchange。
取出聚合器并使用默认聚合器。得到同样的问题。添加日志显示有 XML 输出来自先前路由到 pollEnrich 端点。
package com.hitrust.route;
import com.hitrust.aggregator.AddToOutput;
import com.hitrust.processor.ConvertResultToRecords;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class AuthoritativeSourceDocument extends RouteBuilder {
public void configure() throws Exception {
AggregationStrategy addToOutput = new AddToOutput();
restConfiguration()
.component("restlet")
.host("localhost").port("18082");
rest("/authoritativesourcedocument/{authoritativesourcedocumentid}")
.consumes("application/json").produces("application/json")
.get()
.to("direct:Start");
from("direct:Start")
.multicast()
.to("direct:GetSections")
.to("direct:GetTransactions")
.to("direct:MergeSections");
from("direct:GetSections")
.setBody(simple("SELECT * " +
" FROM [dbo].[Section] AS S" +
" WHERE [Id] = ${header.id}"))
.to("jdbc:dataSource")
.setProperty("paramName", simple("Sections"))
.process(new ConvertResultToRecords())
.to("direct:GetSectionsOutput");
from("direct:GetTransactions")
.setBody(simple("SELECT * " +
" FROM [dbo].[SectionTransaction] AS ST" +
" WHERE [Id] = ${header.id}"))
.to("jdbc:dataSource")
.setProperty("paramName", simple("SectionTransactions"))
.process(new ConvertResultToRecords())
.to("direct:GetTransactionsOutput");
from("direct:MergeSections")
.setBody(simple("<param><id>${header.id}</id></param>"))
.convertBodyTo(org.w3c.dom.Document.class)
.pollEnrich("direct:GetSectionsOutput", 500, addToOutput)
.pollEnrich("direct:GetTransactionsOutput", 500, addToOutput)
.to("xslt:file:src/main/resources/xslts/MergeSections.xsl")
}
}
执行并结合最后一条路由的 xml 输出。
解决方案
请尝试使用“seda:”而不是“direct:”从您的 Get* 路由向 MergeSections 路由中的 pollEnrich 发送消息:
[...]
from("direct:Start")
.multicast()
.to("direct:GetSections")
.to("direct:GetTransactions")
.to("direct:MergeSections");
from("direct:GetSections")
.setBody(simple("SELECT * " +
" FROM [dbo].[Section] AS S" +
" WHERE [Id] = ${header.id}"))
.to("jdbc:dataSource")
.setProperty("paramName", simple("Sections"))
.process(new ConvertResultToRecords())
.to("seda:GetSectionsOutput");
from("direct:GetTransactions")
.setBody(simple("SELECT * " +
" FROM [dbo].[SectionTransaction] AS ST" +
" WHERE [Id] = ${header.id}"))
.to("jdbc:dataSource")
.setProperty("paramName", simple("SectionTransactions"))
.process(new ConvertResultToRecords())
.to("seda:GetTransactionsOutput");
from("direct:MergeSections")
.setBody(simple("<param><id>${header.id}</id></param>"))
.convertBodyTo(org.w3c.dom.Document.class)
.pollEnrich("seda:GetSectionsOutput", 500, addToOutput)
.pollEnrich("seda:GetTransactionsOutput", 500, addToOutput)
.to("xslt:file:src/main/resources/xslts/MergeSections.xsl")
您的代码也看起来像您假设多播默认情况下并行调度到端点 - 它没有。您需要为此添加 parallelProcessing() 选项。
推荐阅读
- mysql - 间歇性锁定等待超时 Laravel DB 事务(重试 5 次)
- pandas - 不平衡学习:导入错误:无法导入名称“MultiOutputMixin”
- python - 如何更新通过“OneToOneField”扩展的用户模型
- r - 在 rename_all() 中传递多个参数
- mongodb - 为什么我的 Docker 容器仍在尝试连接到 Flask 微服务中的 localhost MongoDB?
- ios - 在 iOS 12.4 上使用 CoreML Word Tagger
- vb.net - 根据两列条件选择一个 DataRow
- nlp - Gensim Mallet:输出没有针对少数主题的术语
- amazon-s3 - Mule - 从 AWS S3 存储桶中获取所有对象(超过 1000 个文件)
- python - 使用本地依赖项部署 GCP Cloud 功能