google-cloud-platform - 使用来自 Google Pubsub 的消息并将其发布到 Kafka
问题描述
我正在尝试使用同步 PULL API 使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们在 GCP 之外运行这个应用程序。
我面临的问题是消费的消息没有在 GCP PubSub 中得到确认。我已经确认本地 Kafka 实例具有来自 GCP PubSub 的消息。GCP DataFlow 中的文档表明,当管道使用数据接收器(在我的情况下为 Kafka)终止时,数据包已完成。
但是由于代码是在 Apache Flink 而不是 GCP DataFlow 中运行的,我认为某种回调不会被触发与确认提交的消息相关。
我在这里做错了什么?
pipeline
.apply("Read GCP PubSub Messages", PubsubIO.readStrings()
.fromSubscription(subscription)
)
.apply(ParseJsons.of(User.class))
.setCoder(SerializableCoder.of(User.class))
.apply("Filter-1", ParDo.of(new FilterTextFn()))
.apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
.apply("Write to Local Kafka",
KafkaIO.<Void,String>write()
.withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
.withTopic("test-topic")
.withValueSerializer((StringSerializer.class))
.values()
);
解决方案
在PubSub IO 类的 Beam文档中提到了这一点:
检查点既用于向 Pubsub 确认收到的消息(以便它们可能在 Pubsub 端停用),也用于在需要恢复检查点时对已使用的消息进行 NACK(以便 Pubsub 将及时重新发送这些消息)。
ACK 未链接到数据流,您应该在数据流上具有相同的行为。ack 在检查点上发送。通常,检查点是您在流中设置的窗口。
但是,你没有设置窗口!默认情况下,窗口是全局的,并且仅在最后关闭,如果你优雅地停止你的工作(甚至,我不确定这一点)。无论如何,更好的解决方案是使用固定窗口(例如 5 分钟)来确认每个窗口上的消息。
推荐阅读
- java - MigLayout,为什么这个单元格使用了额外的空间?
- flutter - 我可以检索流的倒数第二个值吗?
- angular - 无论如何使用带有 express-ws 的 websockets 和带有 Angular Universal 9 应用程序的 WebSocket 客户端 api?
- can-bus - CAN 总线中的帧确认
- python - python - 如何在python中的matplotlib条形图中为某些条形着色?
- android - Android 导航抽屉模板无法工作,除非它在 Emulator AS4.0 上通过制表符和光标键导航
- networking - istio 主机匹配规则重定向问题
- reactjs - react-table 在渲染时重新初始化
- java - 如何修复缓慢的 gradle clean build
- laravel - Laravel Routes - 可以在不同表单上重用来自一个表单的 ajax 路由调用吗?