scala - 每行不同的字数
问题描述
这与常见的字数统计程序略有不同。我试图获得每行不同的字数。
输入:
Line number one has six words
Line number two has two words
预期输出:
line1 => (Line,1),(number,1),(one,1),(has,1),(six,1),(words,1)
line2 => (Line,1),(number,1),(two,2),(has,1),(words,1)
任何人都可以请指导我。
解决方案
通过在内置函数中使用 Dataframe explode,split,collect_set,groupBy
。
//input data
val df=Seq("Line number one has six words","Line number two has has two words").toDF("input")
scala> :paste
// Entering paste mode (ctrl-D to finish)
df.withColumn("words",explode(split($"input","\\s+"))) //split by space and explode
.groupBy("input","words") //group by on both columns
.count()
.withColumn("line_word_count",struct($"words",$"count")) //create struct
.groupBy("input") //grouping by input data column
.agg(collect_set("line_word_count").alias("line_word_count"))
.show(false)
Result:
+---------------------------------+------------------------------------------------------------------+
|input |line_word_count |
+---------------------------------+------------------------------------------------------------------+
|Line number one has six words |[[one, 1], [has, 1], [six, 1], [number, 1], [words, 1], [Line, 1]]|
|Line number two has has two words|[[has, 2], [two, 2], [words, 1], [number, 1], [Line, 1]] |
+---------------------------------+------------------------------------------------------------------+
如果您期望行号,请使用concat,monotonically_increasing_id
函数:
df.withColumn("line",concat(lit("line"),monotonically_increasing_id()+1))
.withColumn("words",explode(split($"input","\\s+")))
.groupBy("input","words","line")
.count()
.withColumn("line_word_count",struct($"words",$"count"))
.groupBy("line")
.agg(collect_set("line_word_count").alias("line_word_count"))
.show(false)
Result:
+-----+------------------------------------------------------------------+
|line |line_word_count |
+-----+------------------------------------------------------------------+
|line1|[[one, 1], [has, 1], [six, 1], [words, 1], [number, 1], [Line, 1]]|
|line2|[[has, 2], [two, 2], [number, 1], [words, 1], [Line, 1]] |
+-----+------------------------------------------------------------------+
请注意,如果数据集较大,要使其连续,我们需要执行.repartition(1)。
推荐阅读
- java - “错误:不兼容的类型:推理变量 R 具有不兼容的边界”当 flatMap 单行流时
- clojure - 递归宏clojure
- excel - 如何使用 VBA 函数计算 Excel 中非数字单词的数量
- tensorflow - 从 tensorboard 中提取直方图值并用 matplotlib 绘制它
- c# - C# 无法将列表绑定到 DataGridView -
- java - Java 反射 - 参数数量错误;预期为 0,得到 1
- ios - iOS 图表:使用非线性 X 轴绘制数据集
- python - Python 文件字符串替换字典和元组
- spring-boot - 使用 Spring Boot 将 schema.sql 导入特定的 H2 数据库
- c++ - 如何区分地图中的 0 和 false?