python - 将 Spark DataFrame 写入 Redis 时如何提高速度?
问题描述
我正在开发基于 Flask 的图书推荐 API,发现要管理多个请求,我需要预先计算相似度矩阵并将其存储在某个地方以供将来查询。这个矩阵是使用 PySpark 基于约 150 万个具有书籍 ID、名称和元数据的数据库条目创建的,结果可以用这个模式来描述(i
并且j
用于书籍索引,dot
是为了它们的元数据的相似性):
StructType(List(StructField(i,IntegerType,true),StructField(j,IntegerType,true),StructField(dot,DoubleType,true)))
最初,我打算使用 spark-redis 连接器将其存储在 Redis 上。但是,以下命令的运行速度似乎很慢(即使初始图书数据库查询大小限制为非常适中的 40k 批次):
similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").option("key.column", "i").save()
Spark 将初始任务分为 9 个阶段中的 3 个,大约需要 6 个小时。奇怪的是,Spark 执行器的存储内存使用量非常低,大约 20kb。Spark 应用程序 UI 描述了一个典型的阶段活动阶段:
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)
是否有可能以某种方式加快这个过程?我的 Spark 会话是这样设置的:
SUBMIT_ARGS = " --driver-memory 2G --executor-memory 2G --executor-cores 4 --packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf().set("spark.jars", "spark-redis/target/spark-redis_2.11-2.4.3-SNAPSHOT-jar-with-dependencies.jar").set("spark.executor.memory", "4g")
sc = SparkContext('local','example', conf=conf)
sql_sc = SQLContext(sc)
解决方案
您可以尝试使用Append
保存模式来避免检查表中是否已存在数据:
similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").mode('append').option("key.column", "i").save()
此外,您可能想要更改
sc = SparkContext('local','example', conf=conf)
至
sc = SparkContext('local[*]','example', conf=conf)
利用您机器上的所有内核。
i
顺便说一句,在 Redis 中用作键是否正确?它不应该是两者的组合i
吗j
?
推荐阅读
- javascript - 不能使用变量来更改 CSS 属性
- paw-app - PAW:将响应中的时间戳替换为日期时间或任何 API?
- three.js - 我有一个正在尝试导入的搅拌机模型,但我看不到它
- dataset - DeepMinds Kinetics-400 数据集标签时间不匹配
- html - 使用 :nth-child(x) CSS 隐藏定价框
- laravel - 运行作业时转换失败(API Convertapi : docx to pdf)
- if-statement - If 和 If 语句 / 使用 { }
- sql - 使用子查询和/或与
- c# - 如何使用反射获取另一个对象内的对象列表
- php - 在 Woocommerce 上购买 3 种不同价格的运动衫