python-3.x - 将列表转换为 pyspark 数据框
问题描述
我有一个从 mysql db 获取的元组列表。我需要将其转换为 pyspark 数据框。
我的代码如下所示:
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
spark = SparkSession.builder.appName('recommendation_clustering').getOrCreate()
sqlContext = SQLContext(spark)
final_result = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/mysqldb",
driver="com.mysql.jdbc.Driver",
dbtable="(select movie_id, genres from program) as rating",
user="user",
password="password",
properties={"driver": 'com.mysql.jdbc.Driver'}
).load().take(3)
final_result 的类型是列表,如下所示:
[行(movie_id='0-0-10',流派='[喜剧]'),行(movie_id='0-0-1113',流派='[音乐]'),行(movie_id='0- 0-1132',流派='[音乐]')])
我需要将其转换为 pyspark 数据框。
我试过
sqlDataFrame = sqlContext.createDataFrame(final_result,["movie_id","genres"])
但收到错误消息。
解决方案
您在评论中提到的错误,AttributeError: 'SparkSession' object has no attribute 'parallelize'
可能是因为您正在尝试创建SQLContext
using SparkSession
。SparkSession
是使用 Spark 操作数据的统一入口点,您无需SQLContext
单独创建。直接使用spark
变量读取数据。
阅读此答案以获取更多详细信息。
此外,final_result
如果您只是使用spark
而不是sqlContext
.
推荐阅读
- javascript - 使用 Jquery 和 AJAX 填充 Select 元素并且不使用 async:false 属性
- dynamics-crm - 无法将用户添加到其他团队
- visual-studio-code - VSCode 更改垂直引导线缩进
- c# - 为什么我的 tcplistener 听不到。当我构建我的应用程序的 setup.exe 文件时?
- c# - 如果不存在则更改表添加 Cassandra
- c++ - 为什么我的工人在错误的线程中工作?
- nginx - nginx中的proxy_read_timeout连续两次读操作是什么意思?
- xcode - 使用 xip 自定义标注视图
- firebase - 如何将来自firebase的名称显示到颤动屏幕中
- list - 初学者问题:在自定义类型的两个列表的每个组合上使用函数的问题