scala - UDF 的性能问题.. 是否有更好的方法来解决转换。数据库写入卡住了
问题描述
桌子
用户身份 | 数据 |
---|---|
123234 | {"type":1,"actionData":{"actionType":"Category","id":"1233232","title":"BOSS","config":{"type":"X"}} } |
我需要一个这样的输出表..
用户身份 | 行动 |
---|---|
123234 | {"type":"Category","template":{"data":"{"title":"BOSS"}" },"additionalInfo":{"type":1,"config":{"type" :“X”} } } |
Scala spark.. 使用 UDF 写入数据库时卡住了。运行 bin/spark-shell --master local[*] --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.0 --driver-memory 200g
需要一个更好的方法来解决它..
object testDataMigration extends Serializable {
def main(cassandra: String): Unit = {
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("UserLookupMigration")
.config("spark.master", "local[*]")
.config("spark.cassandra.connection.host",cassandra)
.config("spark.cassandra.output.batch.size.rows", "10000")
.config("spark.cassandra.read.timeoutMS","60000")
.getOrCreate()
val res = time(migrateData());
Console.println("Time taken to execute script", res._1);
spark.stop();
}
def migrateData()(implicit spark: SparkSession) {
)
val file = new File("validation_count.txt" )
val print_Writer = new PrintWriter(file)
//Reading data from user_feed table
val userFeedData = spark.read.format("org.apache.spark.sql.cassandra")
.option("keyspace", "sunbird").option("table", "TABLE1").load();
print_Writer.write("User Feed Table records:"+ userFeedData.count() );
//Persisting user feed data into memory
userFeedData.persist()
val userFeedWithNewUUID = userFeedData
.withColumn("newId",expr("uuid()"))
.withColumn("action", myColUpdate(userFeedData("data"),
userFeedData("createdby"), userFeedData("category")))
userFeedWithNewUUID.persist()
val userFeedV2Format = userFeedWithNewUUID.select(
col("newId"),col("category"),col("createdby"),
col("createdon"),col("action"),col("expireon"),
col("priority"),col("status"),col("updatedby"),
col("updatedon"),col("userid"))
.withColumnRenamed("newId","id")
.withColumn("version",lit("v2").cast(StringType))
//Persist v2 format data to memory
userFeedV2Format.persist()
print_Writer.write("User Feed V2 Format records:"+ userFeedV2Format.count() );
userFeedV2Format.write.format("org.apache.spark.sql.cassandra")
.option("keyspace", "sunbird_notifications")
.option("table", "TABLE2")
.mode(SaveMode.Append).save();
//Remove from memory
userFeedV2Format.unpersist()
userFeedData.unpersist()
print_Writer.close()
}
def myColUpdate= udf((data: String, createdby: String, category: String)=> {
val jsonMap = parse(data).values.asInstanceOf[Map[String, Object]]
val actionDataMap = new HashMap[String, Object]
val additionalInfo = new HashMap[String,Object]
val dataTemplate = new HashMap[String,String]
val templateMap = new HashMap[String,Object]
val createdByMap = new HashMap[String,Object]
createdByMap("type")="System"
createdByMap("id")=createdby
var actionType: String = null
for((k,v)<-jsonMap){
if(k == "actionData"){
val actionMap = v.asInstanceOf[Map[String,Object]]
if(actionMap.contains("actionType")){
actionType = actionMap("actionType").asInstanceOf[String]
}
for((k1,v1)<-actionMap){
if(k1 == "title" || k1 == "description"){
dataTemplate(k1)=v1.asInstanceOf[String]
}else{
additionalInfo(k1)=v1
}
}
}else{
additionalInfo(k)=v
}
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
templateMap("data")=mapper.writeValueAsString(dataTemplate)
templateMap("ver")="4.4.0"
templateMap("type")="JSON"
actionDataMap("type")=actionType
actionDataMap("category")=category.asInstanceOf[String]
actionDataMap("createdBy")=createdByMap;
actionDataMap("template") =templateMap;
actionDataMap("additionalInfo")=additionalInfo
mapper.writeValueAsString(actionDataMap)
})
}
卡住 表 1 有 4000 万条数据。
解决方案
推荐阅读
- reactjs - 我想为子 Button 组件设置回调(在 App.js 中)
- python - 我只是为了获取更新的状态
- java - 允许重复记录时,如何检查 CSV 的内容是否已上传到数据库?
- bash - SSH 断开连接后在远程路径重新打开终端
- angular - 为什么我的服务中的构造函数只调用一次?
- python-3.x - 从导入的 python 模块获取脚本文件路径
- sql - 根据 SQL Server 中每个客户的事务日期时间序列的累计净流出流量
- python - 使用行和列绘制唯一项目 - Matplotlib
- jquery - 通过前缀触发复选框和 div
- python - 循环遍历 pandas 列以获取 wmd 相似性