首页 > 解决方案 > 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

问题描述

我想限制从 kafka 获取数据时的速率。我的代码如下所示:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

但是,当我打电话时df.count(),结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。

标签: pysparkapache-kafka

解决方案


您为每个分区(0、1、2)带来 200 条记录,总数为 600 条记录。

正如你在这里看到的:

使用 maxOffsetsPerTrigger 选项来限制每个触发器获取的记录数。

这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。


推荐阅读