首页 > 解决方案 > 如何将 Dataflow 的 PubSubIO 与 Pub/Sub 模拟器一起使用?

问题描述

我希望使用带有 TestPipeline 的 Pub/Sub 模拟器来运行数据流的端到端和集成测试。

这是我的以下设置:

  1. 运行模拟器:gcloud beta emulators pubsub start
  2. 在测试中,创建主题并向其发布消息。主机地址和端口是 127.0.0.1:8085

我在 managedChannel 中手动设置了我的 hostPort:

channel = ManagedChannelBuilder.forTarget("127.0.0.1:8085").usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

这是在此处指定的:https ://cloud.google.com/pubsub/docs/emulator#pubsub-emulator-java 。然而,这不是有问题的部分。按照我刚刚提到的链接中的说明,我可以很好地推送和拉取 Pub/Sub 模拟器。问题是当我尝试使用 Dataflow 的 PubSubIO 从给定订阅中读取消息时。

  1. 在我的流式传输管道中,将选项 setPubsubRootUrl 设置为上述地址和端口(也尝试过“http://localhost:8085”和“http://127.0.0.1:8085”)
pipeline.options.setPubsubRootUrl("http://127.0.0.1:8085");
pipeline
            .apply(
                PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscription)
            );

我已确认订阅正确,并且消息已发布到 Pub/Sub 模拟器中的正确主题。

当我将测试消息发布到主题时,我得到:

[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Jul 01, 2020 12:45:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection

但是在尝试使用 PubSubIO 读取数据流管道中的消息时,我反复得到以下信息:

[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Jul 01, 2020 12:45:46 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

我在网上找过这个,看来我采取的步骤应该已经解决了这个问题(例如Local Pubsub Emulator will not work with Dataflow

有没有我可能缺少的设置?似乎无法在网上找到任何其他信息

标签: google-cloud-dataflowgoogle-cloud-pubsub

解决方案


推荐阅读