spring - 使用Java lambda parallelStream时RabbitMQ Spring“无法确定查找键的目标ConnectionFactory”
问题描述
我们有一个使用 RabbitMQ 的 Spring Java 应用程序,下面是场景:
- 有一个消费者从一个队列接收消息并将它们发送到另一个队列。我们使用“SimpleRabbitListenerContainerFactory”作为容器工厂,但是当将消息发送到“parallelStream”中的另一个队列时,我们有一个 IllegalStateException “无法确定查找键的目标 ConnectionFactory”异常
- 当我们删除“parallelStream”时,它可以完美运行。
public void sendMessage(final StagingMessage stagingMessage, final Long timestamp, final String country) {
final List<TransformedMessage> messages = processMessageList(stagingMessage);
messages.parallelStream().forEach(message -> {
final TransformedMessage transformedMessage = buildMessage(timestamp, ApiConstants.POST_METHOD, country);
myMessageSender.sendQueue(country, transformedMessage);
});
}
Connectio Facotory,其中设置了查找键:
@Configuration
@EnableRabbit
public class RabbitBaseConfig {
@Autowired
private QueueProperties queueProperties;
@Bean
@Primary
public ConnectionFactory connectionFactory(final ConnectionFactory connectionFactoryA, final ConnectionFactory connectionFactoryB) {
final SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
final Map<Object, ConnectionFactory> map = new HashMap<>();
for (final String queue : queueProperties.getAQueueMap().values()) {
map.put("[" + queue + "]", connectionFactoryA);
}
for (final String queue : queueProperties.getBQueueMap().values()) {
map.put("[" + queue + "]", connectionFactoryB);
}
simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
return simpleRoutingConnectionFactory;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
解决方案
欢迎来到堆栈溢出!
在提出这样的问题时,您应该始终显示相关的代码和配置 bean。
我假设您正在使用RoutingConnectionFactory
.
它使用 aThreadLocal
来存储查找键,因此发送必须发生在设置键的同一线程上。
无论如何,您通常永远不应该在侦听器中异步;你冒着信息丢失的风险。要增加并发性,请使用容器上的并发属性。
编辑
一种技术是在消息头中传达查找键:
@Bean
public RabbitTemplate template(ConnectionFactory rcf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rcf);
Expression expression = new SpelExpressionParser().parseExpression("messageProperties.headers['cfSelector']");
rabbitTemplate.setSendConnectionFactorySelectorExpression(expression);
return rabbitTemplate;
}
@RabbitListener(queues = "foo")
public void listen1(String in) {
IntStream.range(0, 10)
.parallel()
.mapToObj(i -> in + i)
.forEach(val -> {
this.template.convertAndSend("bar", val.toUpperCase(), msg -> {
msg.getMessageProperties().setHeader("cfSelector", "[bar]");
return msg;
});
});
}
推荐阅读
- python - 如何在 Matplotlib 中绘制给定 2 个点(纬度和经度)的有向线段(矢量)
- scala - Play JSON 中有没有办法为非对象(/数组)定义阅读器?
- apache-kafka - 是否可以将 debezium/kafka 配置为从单个主题中的多个表接收消息?
- javascript - ForbiddenError:无效的csrf令牌NodeJS
- lua - Lua罗技鼠标脚本错误必须替换math.pow
- c# - 将 JSON 包反序列化为具有自定义属性名称的类
- python - 从两个目录中的文件导入函数
- asp.net-core - 如何在 ASP.Net 核心中使用 Outlook 邮件 API?
- flutter - Flutter 电话身份验证 FirebaseAuthInvalidCredentialsException
- powershell - Powershell 将 f5 键发送到边缘