spring-integration - 为什么消息卡在聚合器上?
问题描述
消息从拆分器到聚合需要 20 分钟到一个小时。
我有聚合器将电子邮件消息批量发送到 smtp。有时,消息需要很长时间才能从 OmniAlertsEmailSplitter 到达 OmniAlertsEmailAdapter。
<!-- Split message into separate email messages for delivery -->
<splitter id="omniAlertsEmailSplitter"
input-channel="omniAlertsEmailDeliveryChannel"
output-channel="omniAlertsEmailDeliveryProcessChannel" method="split">
<beans:bean class="com.omnialerts.splitter.OmniAlertsEmailSplitter"> </beans:bean>
</splitter>
<!--
the poller will process 1000 messages every second
if the size of the group is 1000 (the poll reached the max messages) or 1 seconds time out (poll has less than 1000 messages) then the payload with the list of messages is passed to defined output channel
-->
<aggregator input-channel="omniAlertsEmailDeliveryProcessChannel" output-channel="omniAlertsEmailDispatchChannel"
discard-channel="omniAlertsEmailDispatchChannel"
send-partial-result-on-expiry="true"
group-timeout="1000"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 1000">
<poller max-messages-per-poll="1000" fixed-rate="1000"/>
</aggregator>
<channel id="omniAlertsEmailDeliveryProcessChannel">
<queue/>
</channel>
<service-activator
input-channel="omniAlertsEmailDispatchChannel"
output-channel="omniAlertsEmailDeliveryStatusChannel" method="send">
<beans:bean class="com.omnialerts.adapters.OmniAlertsEmailAdapter"> </beans:bean>
</service-activator>
<channel id="omniAlertsEmailDeliveryStatusChannel" />
它大部分工作正常,但在某些 JVM 上我注意到这种延迟。可能是由于这些 JVM 上的一些额外负载,但它不应该超过几秒钟。我在配置中遗漏了什么吗?
解决方案
您尝试在其投票中依赖TaskScheduler
with线程。10
这些线程可以在其他进程中使用,因此您的聚合器可能没有那么大的吸引力。
我不确定立即拆分和聚合有什么意义,但您可以考虑为此使用专用task-executor
的<poller>
,因此所有聚合器任务都将在其自己的线程池中执行。
还考虑使用:
<xsd:attribute name="expire-groups-upon-completion" default="false">
<xsd:annotation>
<xsd:documentation>
Boolean flag specifying if MessageGroup should be removed once completed. Useful for
handling late arrival use cases where messages arriving with the correlationKey that
is the same as the completed MessageGroup will be discarded. Default is 'false'
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
而不是discard-channel
.
推荐阅读
- javascript - 如何删除或简单地用 0 替换 Nan?
- authentication - 向事件处理程序(Azure 事件网格)验证事件传递
- google-sheets - 我可以创建浏览器书签以自动创建 Google 表格文档的新副本以用作模板吗?
- swift - 大标题看起来不大
- javascript - 如何在html中显示JS字符串值(动态数据)?
- jquery - 如何在dataTable现有列中添加静态数据列
- vb.net - 我如何从 Button1_Click 调用具有“ByVal 控件作为 IRibbonControl”的子类?
- nginx - NGINX 在访问定义的位置时返回 404
- javascript - JS | jQuery - 每个项目的滚动事件
- c# - 使用 LINQ 从列表中删除重复项,获取 System.Linq.Enumerable+WhereSelectEnumerableIterator`2