java - 在 apache camel 中创建发送到动态 .to() 的动态路由
问题描述
我想创建一个应用程序,该应用程序将根据 from() 和 to() 两端的运行时间决定路由,并且两者之间的处理保持不变
我的例子:我想在我的应用程序中有一个配置文件,在那里我会告诉源和目标,而中间工作保持不变。
我将始终在我的所有解决方案中作为 process() 执行工作
棘手的部分是让两端都充满活力......
我读过收件人列表、toD()、choice().when()
我似乎无法完成这项工作。
我的目标是拥有某种 RouteFactory 来相应地设置 camelContext(假设我的选项是 kafka 或 rabbitMq)
一旦我得到我的 kafka/rabbitMQ 路由,现在我的处理器()将在消息中设置一个标题,告诉我我的 to()是否需要去 kafka/RabbitMq
我设法只解决了这个问题,要么我的 from() 是动态的,要么我的 to() 是动态的......
但我似乎无法让它们都充满活力
我像这样创建了一个工厂:
public RouteBuilder RouteFactory(String type, Processor processor)
{
switch (type)
{
case "kafka":
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// setup kafka component with the brokers
KafkaComponent kafka = new KafkaComponent();
kafka.setBrokers("{{kafka.host}}:{{kafka.port}}");
getContext().addComponent("kafka", kafka);
log.info("About to start route: Kafka Server -> Log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}"
+ "&valueDeserializer=" + BytesDeserializer.class.getName())
.routeId("FromKafka")
.process(processor)
.choice()
.when(header("kafka"))
.recipientList(simple("kafka:${header.kafka[topicName]}"))
.end()
.when(header("rabbit"))
.recipientList(simple("rabbit:${header.rabbit[queueName]}"))
.end();
}
};
case "rabbit":
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// set rabbit component
from("rabbit")
.routeId("rabbit")
.process(processor)
.choice()
.when(header("kafka"))
.recipientList(simple("kafka:${header.kafka[topicName]}"))
.end()
.when(header("rabbit"))
.recipientList(simple("rabbit:${header.rabbit[queueName]}"))
.end();
}
};
}
return null;
}
正如你所看到的,两条路线都有相同的结局,我添加的消息越多,我需要做的复制粘贴就越多......
我似乎找不到使 to() 动态化的方法(也许是另一个工厂?但是 to() 方法不接受路由)
如果我可以在我的所有工厂路线中执行相同的通用 to(),然后在一个地方管理我拥有的所有 to() 选项,那将是完美的,我尝试使用“direct:outputManager”但是然后“ outputManager" 必须从 from() 开始,我只需要解析 to()
PS - 我是骆驼的新手,所以我接受我可能完全离开并且有一种更简单的解决方案,我很高兴听到它 PS 2 - 我只检查了 kafka 路线和兔子路线,我只是对它进行了 psodo 编码以适应这个问题,所以我知道它不适用于兔子部分
编辑:正如 Paizo 在评论中建议的那样,我探索了端点方向,它看起来很有希望我唯一想念的是实际上能够访问端点中的消息以决定返回哪个生产者
代码:
.to(new DefaultEndpoint() {
@Override
public Producer createProducer() throws Exception {
// get message header and decide which producer to return
Producer producer;
//case kafka
producer = new KafkaProducer(new KafkaEndpoint("kafka:blaTopic",new KafkaComponent(getContext())));
//case rabbit - psodo code
producer = new RabbitProducer();
return producer;
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
}
@Override
public boolean isSingleton() {
return false;
}
})
解决方案
推荐阅读
- python - 不正确嵌套的括号正则表达式
- reactjs - 为什么 getStaticProps 数据为空白?
- bash - osascript 调用的对话框文本中不需要的引号
- flutter - 将点击事件传递给 Stack 后面的孩子
- apache-spark - 接收器是否将其流媒体内容存储在执行器运行的地方?
- python - 如何在 Django admin 中更改用户更改页面的导航栏内容?
- javascript - 调用 Stripe 时出现 Firebase 函数错误
- highcharts - csvURL:我的一些系列名称说“未定义”
- python - 在 Tensorflow 中使用 CNN 进行预测
- c# - 当 DropDownList 中的选定项目更改时,标签不会更新