首页 > 解决方案 > 使用来自 Google Pubsub 的消息并将其发布到 Kafka

问题描述

我正在尝试使用同步 PULL API 使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们在 GCP 之外运行这个应用程序。

我面临的问题是消费的消息没有在 GCP PubSub 中得到确认。我已经确认本地 Kafka 实例具有来自 GCP PubSub 的消息。GCP DataFlow 中的文档表明,当管道使用数据接收器(在我的情况下为 Kafka)终止时,数据包已完成。

但是由于代码是在 Apache Flink 而不是 GCP DataFlow 中运行的,我认为某种回调不会被触发与确认提交的消息相关。
我在这里做错了什么?

                   pipeline
                    .apply("Read  GCP PubSub Messages", PubsubIO.readStrings()
                            .fromSubscription(subscription)
                    )
                    .apply(ParseJsons.of(User.class))
                    .setCoder(SerializableCoder.of(User.class))
                    .apply("Filter-1", ParDo.of(new FilterTextFn()))
                    .apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
                    .apply("Write to Local Kafka",
                            KafkaIO.<Void,String>write()
                                    .withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
                                    .withTopic("test-topic")
                                    .withValueSerializer((StringSerializer.class))
                                    .values()
                    );

标签: google-cloud-platformapache-flinkapache-beamgoogle-cloud-pubsubapache-beam-kafkaio

解决方案


在PubSub IO 类的 Beam文档中提到了这一点:

检查点既用于向 Pubsub 确认收到的消息(以便它们可能在 Pubsub 端停用),也用于在需要恢复检查点时对已使用的消息进行 NACK(以便 Pubsub 将及时重新发送这些消息)。

ACK 未链接到数据流,您应该在数据流上具有相同的行为。ack 在检查点上发送。通常,检查点是您在流中设置的窗口。

但是,你没有设置窗口!默认情况下,窗口是全局的,并且仅在最后关闭,如果你优雅地停止你的工作(甚至,我不确定这一点)。无论如何,更好的解决方案是使用固定窗口(例如 5 分钟)来确认每个窗口上的消息。


推荐阅读