首页 > 解决方案 > 如何正确使用 Scatter-Gather 模式?

问题描述

当谈到 Spring Integration 时,我仍然很新,我正在尝试使用IntetrationFlowDefinition.scatterGather()但无济于事。总体思路是:

我使用消息网关作为我的输入/输出:

@MessagingGateway(name = "gateway")
public interface TestGateway {

  @Gateway(requestChannel = "input")
  String process(String input);

}

并通过以下方式调用它:

final TestGateway gateway = (TestGateway) ctx.getBean("gateway");
System.out.println(gateway.process("this is a test"));

IntegrationFlow的是:

@Bean
public IntegrationFlow soutFlow() {
  return f -> f.handle(m -> System.out.println(m.toString()));
}

@Bean
public IntegrationFlow flow() {
  return IntegrationFlows.from("input")
    .split(s -> s.delimiters(" "))
    .wireTap(soutFlow())
    .scatterGather(
      sc -> sc.recipientFlow(m -> true, f1 -> f1.handle((p, h) -> p + " - flow 1").wireTap(soutFlow()))
        .recipientFlow(m -> true, f2 -> f2.handle((p, h) -> p + " - flow 2").wireTap(soutFlow()))
        .applySequence(true),
      ga -> ga.outputProcessor(mg -> mg.getMessages()
        .stream()
        .map(m -> m.getPayload().toString())
        .collect(Collectors.joining(", "))),
      sg -> sg.gatherTimeout(1_000))
    .wireTap(soutFlow())
    .aggregate()
    .wireTap(soutFlow())
    .transform((List<String> source) -> source.stream()
      .map(s -> "- " + s)
      .collect(Collectors.joining("\n")))
    .get();
}

我希望看到输出格式如下:

- this - flow 1, this - flow 2
- is - flow 1, is - flow 2
- a - flow 1, a - flow 2
- test - flow 1, test - flow 2

不幸的是,这不起作用,收集器总是在 1 秒后超时。窃听调试输出显示以下消息:

GenericMessage [payload=this, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=1, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId=7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=6c81a1ce-9a7c-83c3-3b69-7fb2f8b3112c, timestamp=1538668173435}]
GenericMessage [payload=this - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:1, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 1, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=8d29aea7-3ec7-2c15-5b92-fbb0af002a3f, id=7ca4de30-f801-ad14-6452-249c85e9ab36, timestamp=1538668173446}]
GenericMessage [payload=this - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:1, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 1, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=8d29aea7-3ec7-2c15-5b92-fbb0af002a3f, id=71b22112-8e7b-c2c5-057f-2a91c49af8c8, timestamp=1538668173446}]
GenericMessage [payload=is, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=2, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId=7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=9da2f181-82de-d50e-51bb-229a3e32b998, timestamp=1538668174448}]
GenericMessage [payload=is - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:2, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 2, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0abfd6a8-713d-e0cb-bc59-a111a85429df, id=d5102417-f6c2-cbac-197a-b9c4f3f51a74, timestamp=1538668174448}]
GenericMessage [payload=is - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:2, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 2, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0abfd6a8-713d-e0cb-bc59-a111a85429df, id=d9b31390-d3d1-9b61-43fc-45c06ac86931, timestamp=1538668174449}]
GenericMessage [payload=a, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=3, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId=7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=f109f368-6859-ac8d-0e0c-3beb563f9594, timestamp=1538668175451}]
GenericMessage [payload=a - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:3, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 3, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0e347381-a6e2-891a-72ff-c862565d6a1e, id=e0270063-6e19-5827-7674-21fa047ab11d, timestamp=1538668175451}]
GenericMessage [payload=a - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:3, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 3, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0e347381-a6e2-891a-72ff-c862565d6a1e, id=b08aab99-10ce-4dac-2f3c-b48b4514fd35, timestamp=1538668175451}]
GenericMessage [payload=test, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId=7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=52feeb27-8da4-70c4-889e-f482d2922e40, timestamp=1538668176453}]
GenericMessage [payload=test - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:4, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 4, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=53e7fd2c-d3a4-0c0a-1296-2a14ef5e90ab, id=9152e603-e436-c0a4-abde-1f7e5ae654bf, timestamp=1538668176454}]
GenericMessage [payload=test - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel=ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:4, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 4, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=53e7fd2c-d3a4-0c0a-1296-2a14ef5e90ab, id=a7357efd-d34a-6a05-159e-5860e3bc5281, timestamp=1538668176454}]

我一定遗漏了一些明显的东西,但是当涉及到好的 Java DSL 示例时,Spring 文档非常稀缺。

更新:

wireTap()事实上,使用确实会影响流程。我已将其删除并设置了一些断点。似乎收集器正在正确处理分散和处理的消息,问题似乎在于将它们进一步发送到流的下游。例外是:

org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.core.DestinationResolutionException:
 failed to look up MessageChannel with name '27cf1bec-7e06-424e-80f8-9f2f67ebaf87:2' in the BeanFactory.;
 nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '27cf1bec-7e06-424e-80f8-9f2f67ebaf87:2' available,
 failedMessage=GenericMessage [payload=is - flow 1, is - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@4bafe935, sequenceNumber=2, gatherResultChannel=27cf1bec-7e06-424e-80f8-9f2f67ebaf87:2, sequenceDetails=[[2baa7135-5c02-932f-6b5d-cfd4052a965b, 2, 4]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@87b5b49, sequenceSize=2, correlationId=c04f280d-83b7-74e9-a0a6-0f89efdf795c, id=ffec13cf-531e-0721-1484-e8b26b705330, timestamp=1538735548924}]

我想这些中间通道会在使用时由 Spring 自动创建IntegrationFlow。不是这样吗?

标签: javaspringspring-integrationspring-integration-dsl

解决方案


问题是收集器设置不正确/不正确。我的假设是,如果 scatter-gather 的输入是单个String(标记化的词),则输出也应该是单个String. 不幸的是,以这种方式创建的聚合器没有正确恢复序列/相关标头,导致下一个排队的聚合器无限期地等待完成组。提供null收集器可恢复默认行为。缺点是 scatter-gather 输出现在是List. 这可以通过在下一个聚合器之前放置一个扁平化列表的转换器来轻松缓解。下面的工作代码:

@Bean
public IntegrationFlow flow() {
  return IntegrationFlows.from("input")
    .split(s -> s.delimiters(" "))
    .scatterGather(
      sc -> sc
        .applySequence(true)
        .recipientFlow(m -> true, f1 -> f1.handle((p, h) -> p + " - flow 1"))
        .recipientFlow(m -> true, f2 -> f2.handle((p, h) -> p + " - flow 2")),
      null
    )
    .transform((List<String> l) -> l.stream().collect(Collectors.joining(", ")))
    .aggregate()
    .transform((List<String> source) -> source.stream()
      .map(s -> "- " + s)
      .collect(Collectors.joining("\n")))
    .get();
}

现在的输出正是预期的样子:

- this - flow 1, this - flow 2
- is - flow 1, is - flow 2
- a - flow 1, a - flow 2
- test - flow 1, test - flow 2

推荐阅读