python - 在 Pyspark 的 FPGrowth 中将 RDD 转换为 Dataframe
问题描述
当我DataFrame
从RDD
.
from pyspark.ml.fpm import FPGrowth
sogou = sc.textFile("SogouQ.sample.utf8", use_unicode = False)
def parse(line):
value = [ x for x in line.split(",") if x]
return list(set(value))
rdd = sogou.map(parse)
df = sogou.toDF('items')
我收到以下错误:
pyspark.sql.utils.ParseException: u"\nmismatched input '' 期待 {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', ' BY、'GROUPING'、'SETS'、'CUBE'、'ROLLUP'、'ORDER'、'HAVING'、'LIMIT'、'AT'、'OR'、'AND'、'IN'、NOT、' NO','EXISTS','BETWEEN','LIKE',RLIKE,'IS','NULL','TRUE','FALSE','NULLS','ASC','DESC','FOR',' INTERVAL','CASE','WHEN','THEN','ELSE','END','JOIN','CROSS','OUTER','INNER','LEFT','SEMI','RIGHT' , '完整的', '自然的', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBounded', 'PRECEDING', 'FOLLOWING', '当前', 'FIRST', '之后','LAST','ROW','WITH','VALUES','CREATE','TABLE','DIRECTORY','VIEW','REPLACE','INSERT','DELETE','INTO' , 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', '分区','函数','DROP','UNION','EXCEPT','MINUS','INTERSECT','TO','TABLESAMPLE','STRATIFY','ALTER'、'RENAME'、'ARRAY'、'MAP'、'STRUCT'、'COMMENT'、'SET'、'RESET'、'DATA'、'START'、'TRANSACTION'、'COMMIT'、'ROLLBACK' , 'MACRO', 'IGNORE', 'BOTH', 'Leading', 'TRAILING', 'IF', 'POSITION', 'DIV', 'PERCENT', 'BUCKET', 'OUT', 'OF', ' SORT'、'CLUSTER'、'DISTRIBUTE'、'OVERWRITE'、'TRANSFORM'、'REDUCE'、'SERDE'、'SERDEPROPERTIES'、'RECORDREADER'、'RECORDWRITER'、'DELIMITED'、'FIELDS'、'TERMINATED' , 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH'、'CLEAR'、'CACHE'、'UNCACHE'、'LAZY'、'FORMATTED'、'GLOBAL'、TEMPORARY、'OPTIONS'、'UNSET'、'TBLPROPERTIES'、'DBPROPERTIES'、'BUCKETS'、'偏斜”、“存储”、“目录”、“位置”、“交换”、“存档”、“取消存档”、“文件格式”、“触摸”、“紧凑”、“连接”、“更改”、“级联” ,“限制”,“集群”,“排序”,“清除”,“输入格式”,“输出格式”,数据库,数据库,“DFS”,“截断”,“分析”,“计算”,“列表”,“统计信息,“分区”,“外部”,“定义”,“撤销”,“授予”,“锁定”,“解锁”,“MSCK”,“修复”,“恢复”,“导出”,“导入”,“加载”,“角色”,“角色”,“压缩”,“本金”,“交易”,“ INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', IDENTIFIER, BACKQUOTED_IDENTIFIER}(第 1 行,第 5 行)\n\n== SQL ==\nitems\ n-----^^^\n"位置 5)\n\n== SQL ==\nitems\n-----^^^\n"位置 5)\n\n== SQL ==\nitems\n-----^^^\n"
文本包含Chinese
. 有关系吗?文字是这样的:
360,安全卫士,
123,123,范冰冰,
当我使用pyspark.mllib.fpgrowth
时,rdd
工作正常。如何将其转换为数据框?
解决方案
这里有两个不同的问题:
toDF
称呼。RDD.toDF
有以下签名:Signature: rdd.toDF(schema=None, sampleRatio=None)
schema
应该在哪里参数模式:
pyspark.sql.types.StructType
或列名列表所以在你的情况下应该是:
sogou.toDF(["items"])
parse
方法:createDataFrame
调用的方法df
需要 aRDD[tuple]
或等效的,可以映射到structs
,除非提供了模式。如果你只想使用一个名字,它应该返回一个tuple
def parse(line): value = [ x for x in line.split(",") if x] return list(set(value)),
结合:
>>> def parse(line):
... value = [ x for x in line.split(",") if x]
... return list(set(value)),
...
...
>>> rdd = sc.parallelize(["360,安全卫士,", "123,123,范冰冰,"])
>>> rdd.map(parse).toDF(["items"]).show()
+--------------+
| items|
+--------------+
| [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+
替代方案(保持您当前的解析实现)将是
>>> from pyspark.sql.types import ArrayType, StringType
>>> def parse(line):
... value = [ x for x in line.split(",") if x]
... return list(set(value))
>>> rdd.map(parse).toDF(ArrayType(StringType())).toDF("items").show()
+--------------+
| items|
+--------------+
| [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+
推荐阅读
- javascript - 使用 PKCE 流集成测试 Okta 应用程序
- c# - 强制执行 OData 过滤器要求
- php - PayPal IPN 错误:不支持标题字段的行折叠
- c# - Yaml:日期时间天 - 1
- methods - 如何在 Dafny 中导致无效的 LogicalExpression?
- c - 由于某种原因,7/7 计算为 0 余数 0,并且所有后续计算都减一
- javascript - 对 50 000 个对象的数组进行排序 - Javascript
- javascript - 如何将动态 Javascript div 高度调整与页内锚超链接结合起来?
- c++ - 用提升精神解析固定宽度的数字
- mongodb - 如何使用 pymongo 在 mongodb 中批量插入文档,同时跳过重复项