首页 > 解决方案 > 如何在 Kafka 和 spark-sql 中管理流式应用程序中的审计?

问题描述

在我们的项目中,我们正在考虑使用带有 spark 流的 Kakfa,对于 PoC,我使用的是 spark 2.4.1 版本的 Kafka 和 Java8。

我有一些疑问:

标签: apache-sparkapache-kafkaspark-streaming

解决方案


如何将丢失的数据处理到 Kafka 主题摄取中?

我不明白这一点。这是否意味着缺少 Kafka 主题中的数据或缺少从 Kafka 主题到 Spark 流的数据?

第一个不能处理,除非你是数据的生产者,你可以根据原因进行更改。如果数据在 Kafka 集群上由保留期管理的 Kafka 主题中仍然可用,则第二种是可能的。

如何保持相同的审计?

你可以做几件事。您可以要求 Kafka 通过提交这些偏移量来管理这些偏移量。或者,您可以将偏移量写入任何其他位置,例如 HBase,然后您可以从那里获取已成功处理的消息偏移量。使用最新的结构化流,您不需要管理这些低级细节,Spark 将在检查点目录中进行管理。

应该遵循什么样的恢复机制?

这取决于您使用的选择。如果您在 HBase 中有偏移量,则可以从 HBase 读取并使用 KafkaUtils 类从给定的偏移量获取消息,方法是:

KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
        )

有关https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html的更多详细信息


推荐阅读