Spring 集成 MessageGroupStoreReaper 全局 errorChannel

发布时间:2021-03-05 04:39

我正在使用 spring 集成聚合器和 MessageGroupStoreReaper,但不知何故错误没有到达全局 errorChannel。

    <int:aggregator id="agg"
                    ref="MyMsgsAggregator"
                    input-channel="myAggInputChannel"
                    output-channel="processInputChannel"
                    discard-channel="errorChannel"
                    method="aggMessages"
                    message-store="messageGroupStore"
                    send-partial-result-on-expiry="true"
                    expire-groups-upon-completion="true" />
                    
<bean id="messageGroupStore" class="org.springframework.integration.store.SimpleMessageStore" />

<task:scheduled-tasks>
        <task:scheduled ref="multipartAggregatorReaper" method="run" fixed-rate="5000" />
</task:scheduled-tasks>

如果“processInputChannel”后有任何异常(例如到期时的部分结果),则异常不会到达全局“errorChannel”。

即使我尝试用轮询器将任务调度作业替换为入站通道适配器(如 @Gary 所建议的),但仍然没有奏效:

<int:inbound-channel-adapter channel="reaperChannel" ref="MyMsgsAggregator" method="triggerReaper"> 
<int:poller error-channel="**errorChannel**" fixed-rate="5000">         
</int:poller>

请建议

谢谢

回答1

您的问题在这里:send-partial-result-on-expiry="true"。此选项确实与 discard-channel 互斥:

protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) {
    ...
    if (this.sendPartialResultOnExpiry) {
        ...
        completeGroup(correlationKey, group, lock);
    }
    else {
        ...
        group.getMessages()
                .forEach(this::discardMessage);
    }
    ...
}

因此,您的消息在收割后转到 processInputChannel 而不是 errorChannel 也就不足为奇了。

此外,discardChannel 与错误无关。在我们收获时丢弃的情况下不会抛出异常,并且没有从那里发送 ErrorMessage。来自已收割组的所有单个消息都作为常规消息发送到配置的 discardChannel