scala - 在 Spark Dataframe Rows 上并行操作
问题描述
环境:Scala,spark,结构化流,kafka
我有一个来自 kafka 流的 DF,具有以下模式
东风:
BATCH ID: 0
+-----------------------+-----+---------+------+
| value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}| A | 0| 0|
|{"big and nested json"}| B | 0| 0|
+-----------------------+-----+---------+------+
我想通过使用火花并行处理每一行,我设法将它们拆分给我的执行者使用
DF.repartition(Number).foreach(row=> processRow(row))
我需要将值列中的值提取到它自己的数据框中来处理它。我在使用 Dataframe 通用 Row 对象时遇到了困难。
有没有办法将每个执行程序中的单行转换为自己的数据框(使用固定模式?)并在固定位置写入?有没有更好的方法来解决我的问题?
编辑+澄清:
forEachBatch
DF im 接收将使用writeStream 功能的功能作为批处理来 ,该功能自存在以来spark2.4
目前将 DF 拆分为 ROWS 使得行将被平均拆分为我的所有执行程序,我想将单个 GenericRow 对象转换为 DataFrame 以便我可以使用我制作的函数进行处理
例如,我会将行发送到函数
processRow(row:row)
取值和主题并将其转回单行 DF
+-----------------------+-----+
| value|topic|
+-----------------------+-----+
|{"big and nested json"}| A |
+-----------------------+-----+
用于进一步处理
解决方案
我猜您一次使用多个 kafka 数据。
首先你需要schema
为所有 kafka 主题做准备,例如我在 value 列中使用了两个不同的 JSON。
scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A |
|{"age":20} |B |
+-------------------+-----+
scala> import org.apache.spark.sql.types._
主题 A 的架构
scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
主题 B 的架构
scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
结合主题及其模式。
scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.
处理数据帧
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value |topic|
+----------+-----+
|[Srinivas]|A |
+----------+-----+
+-----+-----+
|value|topic|
+-----+-----+
|[20] |B |
+-----+-----+
写入 hdfs
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))
存储在 hdfs 中的最终数据
scala> import sys.process._
import sys.process._
scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│ ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│ └── _SUCCESS
└── B
├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
└── _SUCCESS
2 directories, 4 files
推荐阅读
- angular - 以打字稿角度4将传单地图导出为JPG
- azure - Notification Hub 活动设备是否会变为非活动设备?
- git - 使用 SourceTree 等常用工具在 Azure 上运行 Git
- python - Python - IBM Watson Speech to Text 'NoneType' 对象没有属性 'get_result'
- android - 我希望我的按钮被单击一次,我想禁用双击
- php - 我想显示当前时间不在(column)start_time和end_time(column)之间的所有人员记录
- dart - 无法在 onDismissible 中删除
- xml - 将 XML 模式元素引用到与值相同的 XML 模式元素中
- nim-lang - 如何以编程方式获取 nim 编译器版本?
- csv - 当分隔符是列值的一部分时,在 Unix 中计算文件的列数