首页 > 解决方案 > 使用 PySpark 批处理 MongoDB 记录

问题描述

我正在尝试找到一种方法,以批处理方式将 MongoDB 数据库中的记录导入 pyspark 数据帧(即仅接收添加到集合中且尚未被摄取的最新/最新记录)以进行一些数据清理& 分析。我遵循了 MongoDB 示例https://www.mongodb.com/blog/post/getting-started-with-mongodb-pyspark-and-jupyter-notebook并且能够阅读我感兴趣的整个集合。

不同之处在于,我的集合将充当“数据湖”,作为 Kafka 流的消费者,并且每小时/每天都会随着越来越多的数据增长(我打算为大约 30 天的记录设置 TOL )。正如您可以推断的那样,每当这个 spark 作业在当前设置下运行时,数据帧将变得越来越大,因为它摄取了整个集合,即使摄取的记录已经被清理/分析过。

PySpark 和/或 MongoDB 中是否有一种方法可以一次摄取 100 条最旧的记录,进行清理/分析,清除数据框,然后获取下 100 条最旧的记录,依此类推,直到我得到最新记录?

我从中提取数据的推文集合确实具有创建推文时的时间戳的日期字段,因此该时间戳是否可以用作过滤器或排序?

或者删除 MongoDB 并使用 Spark Structured Streaming 从 MongoDB 当前正在使用的 Kafka 主题中读取会更容易/明智吗?

这是我将当前状态/大小的 MongoDB 集合放入数据框的初始代码:

from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("twitter-analysis-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1g").\
        config("spark.mongodb.input.uri","mongodb://mongo1:27017/Twitter_Database.Tweets?replicaSet=rs0").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

df = spark.read.format("mongo").load()

标签: mongodbapache-sparkpyspark

解决方案


推荐阅读