首页 > 解决方案 > Spark Streaming DF 并行处理

问题描述

我在并行处理 Spark Streaming Data Frame 中的记录时遇到问题。流程为:Kafka Topic--> Spark Streaming (Data Frame)--->Implement BusinessRule-->Send result to Kafka topic。当前代码正在处理序列中的数据,Spark Streaming(DataFrame) 作业运行良好但很耗时(由于 foreach() 中每个记录的序列处理)。每分钟 Kafka 产生 300 条消息,Spark 需要 30 分钟来处理它。业务规则并不复杂。

问题:

我试图在 Foreach() 之上调用 Parallel 函数,但是一旦我调用它,Spark 就会多次处理相同的记录。如果您现在可以帮助实现并行性,我将不胜感激。

环境细节:

1)数据格式:Json

2)数据详情

客户、部门、等级、详细信息

Cust1,22,C1,maf

Cust2,23,C2,绘画

Cust3,24,C3,运输

Cust4,22,C4,通用

3)Spark 版本:CDH 6.3 上的 2.4.0

4)斯卡拉:2.11.11

5)Straming:Spark Streaming DataFrame

6)分区:3

7)经纪人:3

这是代码片段。

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",get_details("kafka_bootstrap_server")).option("subscribe",get_details("topic_ora")).option("startingOffsets", get_details("startingOffsets")).load()

val input_data = stream_action_function_data.select($"customer",$"deptid",$"grade",$"details")

val check_User=input_data.writeStream.foreachBatch ((batchDF: DataFrame, batchId: Long) => {
batchDF.persist()

val funcn = batchDF.collect.par.foreach( col =>  {  
import org.apache.spark.sql.Row
import spark.implicits._  
val data=col
println(data)
val descri= data.getString(4)

import org.apache.spark.sql.types.{StructType, StructField, StringType,TimestampType,IntegerType};
println("Description  "+ descri)  

val sc=spark.sparkContext

val rdd = sc.parallelize(Seq(data))

val schema = StructType(Array(StructField("customer", StringType, true),StructField("deptid", StringType, true),StructField("grade", StringType, true),StructField("details", StringType, true)

val df_record=spark.createDataFrame(rdd, schema) 

if ( df_record.getString(4)== "general")
{ 
Send KafkaTopicA()//Send Schema contents to Kafka Topic A in json format                                }  
else 
{ 
Send KafkaTopicB()//Send Schema contents to Kafka Topic B in json format    
}               
})

batchDF.unpersist() 

}).option("checkpointLocation",s"$functions_checkpointLocation").outputMode("append").start()*

输入数据:

Cust4,22,C4,通用

Cust1,22,C1,maf

Cust2,23,C2,绘画

Cust3,24,C3,运输

预期输出:

话题一:

Cust4,22,C4,通用

主题B:

Cust1,22,C1,maf

Cust2,23,C2,绘画

Cust3,24,C3,运输

实际输出:

话题一:

Cust4,22,C4,通用

Cust4,22,C4,通用

Cust4,22,C4,通用

主题B:

Cust1,22,C1,maf

Cust2,23,C2,绘画

Cust3,24,C3,运输

Cust1,22,C1,maf

Cust2,23,C2,绘画

Cust3,24,C3,运输

谢谢!!

标签: scalaapache-sparkapache-spark-sqlspark-streaming

解决方案


请尝试实现下的业务逻辑foreachPartitions(),以实现跨执行器的数据分布,最终促进并行性。collect()另一方面,操作在驱动程序级别执行,因此在实现并行性方面没有任何作用。


推荐阅读