首页 > 解决方案 > 为什么消息卡在聚合器上?

问题描述

消息从拆分器到聚合需要 20 分钟到一个小时。

我有聚合器将电子邮件消息批量发送到 smtp。有时,消息需要很长时间才能从 OmniAlertsEmailSplitter 到达 OmniAlertsEmailAdapter。

<!-- Split message into separate email messages for delivery -->
<splitter id="omniAlertsEmailSplitter"
        input-channel="omniAlertsEmailDeliveryChannel"
        output-channel="omniAlertsEmailDeliveryProcessChannel" method="split">
        <beans:bean class="com.omnialerts.splitter.OmniAlertsEmailSplitter"> </beans:bean>
</splitter>

<!-- 
    the poller will process 1000 messages every second 
    if the size of the group is 1000 (the poll reached the max messages) or 1 seconds time out (poll has less than 1000 messages) then the payload with the list of messages is passed to defined output channel
-->
<aggregator input-channel="omniAlertsEmailDeliveryProcessChannel" output-channel="omniAlertsEmailDispatchChannel"
    discard-channel="omniAlertsEmailDispatchChannel"

    send-partial-result-on-expiry="true"
    group-timeout="1000"
    correlation-strategy-expression="T(Thread).currentThread().id"
    release-strategy-expression="size() == 1000">
    <poller max-messages-per-poll="1000" fixed-rate="1000"/>
</aggregator>
<channel id="omniAlertsEmailDeliveryProcessChannel">
     <queue/>
</channel>
<service-activator
        input-channel="omniAlertsEmailDispatchChannel"
        output-channel="omniAlertsEmailDeliveryStatusChannel" method="send">

        <beans:bean class="com.omnialerts.adapters.OmniAlertsEmailAdapter"> </beans:bean>
</service-activator>
<channel id="omniAlertsEmailDeliveryStatusChannel" />

它大部分工作正常,但在某些 JVM 上我注意到这种延迟。可能是由于这些 JVM 上的一些额外负载,但它不应该超过几秒钟。我在配置中遗漏了什么吗?

标签: spring-integration

解决方案


您尝试在其投票中依赖TaskSchedulerwith线程。10这些线程可以在其他进程中使用,因此您的聚合器可能没有那么大的吸引力。

我不确定立即拆分和聚合有什么意义,但您可以考虑为此使用专用task-executor<poller>,因此所有聚合器任务都将在其自己的线程池中执行。

还考虑使用:

 <xsd:attribute name="expire-groups-upon-completion" default="false">
                <xsd:annotation>
                    <xsd:documentation>
                        Boolean flag specifying if MessageGroup should be removed once completed. Useful for
                        handling late arrival use cases where messages arriving with the correlationKey that
                        is the same as the completed MessageGroup will be discarded. Default is 'false'
                    </xsd:documentation>
                </xsd:annotation>
                <xsd:simpleType>
                    <xsd:union memberTypes="xsd:boolean xsd:string" />
                </xsd:simpleType>
            </xsd:attribute>

而不是discard-channel.


推荐阅读