mongodb - 使用 PySpark 批处理 MongoDB 记录
问题描述
我正在尝试找到一种方法,以批处理方式将 MongoDB 数据库中的记录导入 pyspark 数据帧(即仅接收添加到集合中且尚未被摄取的最新/最新记录)以进行一些数据清理& 分析。我遵循了 MongoDB 示例https://www.mongodb.com/blog/post/getting-started-with-mongodb-pyspark-and-jupyter-notebook并且能够阅读我感兴趣的整个集合。
不同之处在于,我的集合将充当“数据湖”,作为 Kafka 流的消费者,并且每小时/每天都会随着越来越多的数据增长(我打算为大约 30 天的记录设置 TOL )。正如您可以推断的那样,每当这个 spark 作业在当前设置下运行时,数据帧将变得越来越大,因为它摄取了整个集合,即使摄取的记录已经被清理/分析过。
PySpark 和/或 MongoDB 中是否有一种方法可以一次摄取 100 条最旧的记录,进行清理/分析,清除数据框,然后获取下 100 条最旧的记录,依此类推,直到我得到最新记录?
我从中提取数据的推文集合确实具有创建推文时的时间戳的日期字段,因此该时间戳是否可以用作过滤器或排序?
或者删除 MongoDB 并使用 Spark Structured Streaming 从 MongoDB 当前正在使用的 Kafka 主题中读取会更容易/明智吗?
这是我将当前状态/大小的 MongoDB 集合放入数据框的初始代码:
from pyspark.sql import SparkSession
spark = SparkSession.\
builder.\
appName("twitter-analysis-notebook").\
master("spark://spark-master:7077").\
config("spark.executor.memory", "1g").\
config("spark.mongodb.input.uri","mongodb://mongo1:27017/Twitter_Database.Tweets?replicaSet=rs0").\
config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
getOrCreate()
df = spark.read.format("mongo").load()
解决方案
推荐阅读
- vba - 在excel项目中使用pdf打印机打印word文件
- typescript - 无法分配 FormArray
- c# - EF 数据库事务可以循环使用吗?
- python - 订阅后条带收费,从订阅中获取元数据
- python - 使用弹性搜索python包时读取超时错误
- reactjs - 快照测试的工作原理以及 toMatchSnapshot() 函数在 React 组件的 Jest 快照测试中的作用是什么?
- javascript - 使用java脚本更改iframe中的链接
- python - "source" 将 PATH 设置为 bitbake,其中 shell=True 在 Python 中无效
- json - 在 Cosmos DB 中,如何在创建文档之前检测文档大小并做出反应
- react-native - 如何将反应原生组件发布到 NPM