scala - Spark Streaming DF 并行处理
问题描述
我在并行处理 Spark Streaming Data Frame 中的记录时遇到问题。流程为:Kafka Topic--> Spark Streaming (Data Frame)--->Implement BusinessRule-->Send result to Kafka topic。当前代码正在处理序列中的数据,Spark Streaming(DataFrame) 作业运行良好但很耗时(由于 foreach() 中每个记录的序列处理)。每分钟 Kafka 产生 300 条消息,Spark 需要 30 分钟来处理它。业务规则并不复杂。
问题:
我试图在 Foreach() 之上调用 Parallel 函数,但是一旦我调用它,Spark 就会多次处理相同的记录。如果您现在可以帮助实现并行性,我将不胜感激。
环境细节:
1)数据格式:Json
2)数据详情
客户、部门、等级、详细信息
Cust1,22,C1,maf
Cust2,23,C2,绘画
Cust3,24,C3,运输
Cust4,22,C4,通用
3)Spark 版本:CDH 6.3 上的 2.4.0
4)斯卡拉:2.11.11
5)Straming:Spark Streaming DataFrame
6)分区:3
7)经纪人:3
这是代码片段。
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",get_details("kafka_bootstrap_server")).option("subscribe",get_details("topic_ora")).option("startingOffsets", get_details("startingOffsets")).load()
val input_data = stream_action_function_data.select($"customer",$"deptid",$"grade",$"details")
val check_User=input_data.writeStream.foreachBatch ((batchDF: DataFrame, batchId: Long) => {
batchDF.persist()
val funcn = batchDF.collect.par.foreach( col => {
import org.apache.spark.sql.Row
import spark.implicits._
val data=col
println(data)
val descri= data.getString(4)
import org.apache.spark.sql.types.{StructType, StructField, StringType,TimestampType,IntegerType};
println("Description "+ descri)
val sc=spark.sparkContext
val rdd = sc.parallelize(Seq(data))
val schema = StructType(Array(StructField("customer", StringType, true),StructField("deptid", StringType, true),StructField("grade", StringType, true),StructField("details", StringType, true)
val df_record=spark.createDataFrame(rdd, schema)
if ( df_record.getString(4)== "general")
{
Send KafkaTopicA()//Send Schema contents to Kafka Topic A in json format }
else
{
Send KafkaTopicB()//Send Schema contents to Kafka Topic B in json format
}
})
batchDF.unpersist()
}).option("checkpointLocation",s"$functions_checkpointLocation").outputMode("append").start()*
输入数据:
Cust4,22,C4,通用
Cust1,22,C1,maf
Cust2,23,C2,绘画
Cust3,24,C3,运输
预期输出:
话题一:
Cust4,22,C4,通用
主题B:
Cust1,22,C1,maf
Cust2,23,C2,绘画
Cust3,24,C3,运输
实际输出:
话题一:
Cust4,22,C4,通用
Cust4,22,C4,通用
Cust4,22,C4,通用
主题B:
Cust1,22,C1,maf
Cust2,23,C2,绘画
Cust3,24,C3,运输
Cust1,22,C1,maf
Cust2,23,C2,绘画
Cust3,24,C3,运输
谢谢!!
解决方案
请尝试实现下的业务逻辑foreachPartitions()
,以实现跨执行器的数据分布,最终促进并行性。collect()
另一方面,操作在驱动程序级别执行,因此在实现并行性方面没有任何作用。
推荐阅读
- dart - 查询多个引用。Firestore - 颤振
- sql - 如果 table2 中不存在整行,则插入 table2 从 table1 中选择
- node.js - sendgrid 使用@sendgrid/mail 设置
- excel - 获取 Windows 显示缩放值
- visual-studio-code - 导航代码时,Visual Studio 代码会自动更改文件夹树
- tensorflow - 我需要 TensorFlow 来进行 Keras 预测吗
- swift - 为我的 AFNetworking 获得 400 电话
- java - 将域日期转换为 java util 日期
- javascript - 另一个函数完成后如何执行javascript函数
- java - 如何检查数组是否不为空并且它的某些特定值是否为数字?