spring-integration - Spring 集成负载平衡到 JMS 队列
问题描述
我想从单个输入队列中获取 JMS 消息并将它们扇出到 N 个输出队列中。
我有一个简单的流程,它将消息转发到单个目的地,但无法弄清楚如何应用 LoadBalancer 以允许以循环方式处理多个目的地。
任何想法如何做到这一点?
@Configuration
public class TestLoadBalance {
public static final String INPUT_QUEUE = "_dev.lb.input";
public static final String OUTPUT_QUEUE_PREFIX = "_dev.lb.output-";
@Bean
public IntegrationFlow testLoadBalanceFlow(
ConnectionFactory jmsConnectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination(INPUT_QUEUE)
)
.handle(buildOutput(jmsConnectionFactory, 1))
// cant have 2nd handle. gets warn & flow end:
// The 'currentComponent' (org.springframework.integration.jms.JmsSendingMessageHandler@516462cc)
// is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'
//.handle(buildOutput(jmsConnectionFactory, 2))
.get();
return flow;
}
private JmsSendingMessageHandler buildOutput(ConnectionFactory jmsConnectionFactory, int i){
return Jms.outboundAdapter(jmsConnectionFactory)
.destination(OUTPUT_QUEUE_PREFIX + i).get();
}
}
解决方案
基于 Gary 的示例,我使用了 destinationExpression 方法:
@Configuration
public class TestLoadBalance {
public static final String INPUT_QUEUE = "_dev.lb.input";
public static final String OUTPUT_QUEUE_PREFIX = "_dev.lb.output-";
@Bean
public JmsDestinationPartitioner partitioner() {
return new JmsDestinationPartitioner(OUTPUT_QUEUE_PREFIX,1,3);
}
@Bean
public IntegrationFlow testLoadBalanceFlow(
ConnectionFactory jmsConnectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination(INPUT_QUEUE)
)
.handle(Jms.outboundAdapter((jmsConnectionFactory))
.destinationExpression("@partitioner.nextDestination()"))
.get();
return flow;
}
}
使用 AtomicInt 的包装器来处理带有前缀的命名:
public class JmsDestinationPartitioner {
private int min;
private int max;
private String prefix;
private AtomicInteger current;
public JmsDestinationPartitioner(String prefix, int min, int max){
this.prefix = prefix;
this.min = min;
this.max = max;
current = new AtomicInteger(min);
}
public int getAndIncrement(){
int i = current.get();
current.getAndIncrement();
if (current.get() > max){
current.set(min);
}
return i;
}
public String nextDestination(){
return prefix + getAndIncrement();
}
}
推荐阅读
- swift - 如何正确格式化字符串以在 Swift 中填充标签
- apache-kafka - 连接到集群时出错。无法连接到 xxx.com kafka 工具
- node.js - 如何使用 SAM CLI 部署 API 网关、Lambda(Node.js) 和 DynamoDB?
- medium.com - 插入带有 Back Tics 的内联代码在 Medium.com 中不起作用
- python - 程序在没有警告的情况下终止 [PyQt5]
- android - 通知观察者 LiveData Firebase 实时数据库元素被删除
- java - 带有弹簧集成的 Struts 2.5 Junit 测试设置
- c - C 编程参数
- java - USB 串行驱动程序 Java 插件在 Unity Android 版本中不起作用
- sql - 在过程 pl/sql 中的循环内更新表