java - | 警告 | 传输连接到:tcp://ip:port 失败:org.apache.activemq.transport.InactivityIOException:
问题描述
| WARN | Transport Connection to: tcp://ip:port failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long | org.apache.activemq.broker.TransportConnection.Transport | AmqpInactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@7e641927[State = -1, empty queue]
我正在尝试从 java[publisher] 向活动 mq 发送消息,该消息正在活动 mq 中排队。订阅者代码是用 python[使用这个库 python-qpid-proton 0.31.0] 编写的。当没有请求发送到活动 mq 时,订阅者保持活动状态。但是,当请求发送到活动 mq 时,“活动持久主题订阅者”中的客户端在处理请求的一段时间后进入“离线持久主题订阅者”。当在 pycharm 中运行相同的代码时可以正常工作,但在 exe 上可以看到这个问题 python 接收器代码如下所示:-
P1 = Receiver(url, subscriptionname)
from proton.reactor import Container
Container(P1).run()
class Receiver(MessagingHandler):
super(Receiver, self).__init__()
def __init__(self, url, subscriptionname):
self.url = Url(url)
self.stopping = False
self.messages_actually_received = 0
self.subscriptionName = subscriptionname
def on_start(self, event):
durable = DurableSubscription()
event.container.container_id = "client"
connection = event.container.connect(self.url)
event.container.create_receiver(connection , self.url.path, name=self.subscriptionName, options=durable)
def on_message(self, event):
if self.stopping:
return
self.messages_actually_received += 1
if event.message.body == 'message':
pass
解决方案
在上述情况和您刚刚在 ActiveMQ 邮件列表上发布的消息之间,您说您的“巨大请求处理”实际上需要 >30 秒,这听起来很像您正在利用客户端容器线程在 on_message 中完成 > 30 秒的工作,你真的不应该这样做。它是一个单线程客户端,因此这样做意味着在您为其他事情独占线程期间,它根本无法处理连接。这意味着如果需要满足代理(默认 30 秒)不活动超时,它不能发送心跳帧,同时没有其他消息传递流量被视为活动。由于客户端在 30 秒内未向代理发送任何数据,因此连接将被视为已死并被终止,因为日志消息清楚地显示发生了。
您通常应该避免长时间阻塞容器线程,例如禁用接收器上的自动接受,并在需要时将长时间运行的消息处理卸载到工作线程,然后再传递回容器线程进行接受。或者,您将需要为代理配置更高的连接超时,这样它就不会杀死您明显的死连接。
请注意,如果您卸载消息处理,将任务传递回容器线程以例如接受/其他消息需要安全完成,因为连接再次是单线程的,并且只能直接从容器线程使用,直接从另一个使用它,而容器线程也可能在使用它是不安全的。
我相信 EventInjector 是这样的事情的预期路线,激发了与容器线程之外的连接工作:http: //qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/docs/ proton.reactor.html#proton.reactor.EventInjector
它在这些示例中使用:http: //qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/examples/db_recv.py.html http://qpid.apache.org/releases/qpid -proton-0.36.0/proton/python/examples/db_send.py.html http://qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/examples/tx_recv_interactive.py.html
您还可以在容器上触发您自己的计划回调,因此更简单的替代方案可能只是计划(同时仍然使用容器线程,而不是您自己的)您自己的任务以定期运行,例如获取已完成的“已处理”消息从线程安全的工作队列中接受/其他。定时器示例:http: //qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/examples/recurring_timer.py.html
推荐阅读
- javascript - 如何使用来自 div 输入标签的输入进行计算
- postgresql - Postgres - 函数
- node.js - Javascript 承诺 console.logs 不会在 AWS Lambda 周围首次打印
- ajax - ajax/jsp 会话变量的问题
- javascript - 如何通过javascript将隐藏字段数据从url传递到嵌入式typeform?
- phoenix-live-view - Phoenix Live View 使用 phx-update="ignore" 不断重建 DOM 元素
- javascript - JavaScript 对象:为什么 value1.a[x] 和 value2.b[x] 返回未定义,因为 a[x] = a 和 b[x] = b 是 value1 和 value2 的属性?
- ruby - 从Ruby中的字符串转换日期
- git - 无法从 WSL 中的 Github 更新子模块
- git - 如何从先前的提交访问第二个现有的提交?