pyspark - 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?
问题描述
我想限制从 kafka 获取数据时的速率。我的代码如下所示:
df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
.option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()
但是,当我打电话时df.count()
,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。
解决方案
您为每个分区(0、1、2)带来 200 条记录,总数为 600 条记录。
正如你在这里看到的:
使用 maxOffsetsPerTrigger 选项来限制每个触发器获取的记录数。
这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。
推荐阅读
- python - 类型不正确。预期的 pk 值,收到的字典。Django 休息框架 - Vue
- supervisord - 在运行 supervisorctl reread 时出现错误
- javascript - 图片上传:Angular 6
- javascript - OneNote online JavaScript API 可以接收本地程序的信息吗?
- go - 如何解码恒星 XDR
- java - 使用 geomesa-accumulo 摄取 GeoTIFF
- python - 将对象与 asyncio 和线程相结合
- swift - 嵌套字典中的按键快速分组
- python - 用 Python 解决旅行商问题的 2-opt 算法
- corda - 可能的网络引导错误,带有不适当的错误消息