首页 > 解决方案 > UDF 的性能问题.. 是否有更好的方法来解决转换。数据库写入卡住了

问题描述

桌子

用户身份 数据
123234 {"type":1,"actionData":{"actionType":"Category","id":"1233232","title":"BOSS","config":{"type":"X"}} }

我需要一个这样的输出表..

用户身份 行动
123234 {"type":"Category","template":{"data":"{"title":"BOSS"}" },"additionalInfo":{"type":1,"config":{"type" :“X”} } }

Scala spark.. 使用 UDF 写入数据库时​​卡住了。运行 bin/spark-shell --master local[*] --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.0 --driver-memory 200g

需要一个更好的方法来解决它..

object testDataMigration extends Serializable {

  def main(cassandra: String): Unit = {
    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .appName("UserLookupMigration")
        .config("spark.master", "local[*]")
        .config("spark.cassandra.connection.host",cassandra)
        .config("spark.cassandra.output.batch.size.rows", "10000")
        .config("spark.cassandra.read.timeoutMS","60000")
        .getOrCreate()

    val res = time(migrateData());

    Console.println("Time taken to execute script", res._1);
    spark.stop();
  }

def migrateData()(implicit spark: SparkSession) {

   )    
  val file = new File("validation_count.txt" )
    val print_Writer = new PrintWriter(file)

    //Reading data from user_feed table
    val userFeedData = spark.read.format("org.apache.spark.sql.cassandra")
      .option("keyspace", "sunbird").option("table", "TABLE1").load();
    print_Writer.write("User Feed Table records:"+ userFeedData.count() );

    //Persisting user feed data into memory

    userFeedData.persist()


    val userFeedWithNewUUID = userFeedData
     .withColumn("newId",expr("uuid()"))
     .withColumn("action", myColUpdate(userFeedData("data"),
userFeedData("createdby"), userFeedData("category")))

    userFeedWithNewUUID.persist()

    val userFeedV2Format = userFeedWithNewUUID.select(
       col("newId"),col("category"),col("createdby"),
       col("createdon"),col("action"),col("expireon"),
       col("priority"),col("status"),col("updatedby"),
       col("updatedon"),col("userid"))
     .withColumnRenamed("newId","id")
     .withColumn("version",lit("v2").cast(StringType))

    //Persist v2 format data to memory

    userFeedV2Format.persist()


    print_Writer.write("User Feed V2 Format records:"+ userFeedV2Format.count() );

    userFeedV2Format.write.format("org.apache.spark.sql.cassandra")
     .option("keyspace", "sunbird_notifications")
     .option("table", "TABLE2")
     .mode(SaveMode.Append).save();

    //Remove from memory

    userFeedV2Format.unpersist()
    userFeedData.unpersist()

    print_Writer.close()
  }
  
  def myColUpdate= udf((data: String, createdby: String, category: String)=> {
    val jsonMap = parse(data).values.asInstanceOf[Map[String, Object]]
    val actionDataMap = new HashMap[String, Object]
    val additionalInfo = new HashMap[String,Object]
    val dataTemplate = new HashMap[String,String] 
    
    val templateMap = new HashMap[String,Object]
    val createdByMap = new HashMap[String,Object]
    createdByMap("type")="System"
    createdByMap("id")=createdby
    var actionType: String = null
        for((k,v)<-jsonMap){
           if(k == "actionData"){ 
               val actionMap = v.asInstanceOf[Map[String,Object]]
               if(actionMap.contains("actionType")){
                 actionType = actionMap("actionType").asInstanceOf[String]
               }
               for((k1,v1)<-actionMap){
                  if(k1 == "title" || k1 == "description"){
                       dataTemplate(k1)=v1.asInstanceOf[String]                
                  }else{                  
                    additionalInfo(k1)=v1                  
                  }
               }
           }else{
             additionalInfo(k)=v
           }
        }
        val mapper = new ObjectMapper()
        mapper.registerModule(DefaultScalaModule)

        templateMap("data")=mapper.writeValueAsString(dataTemplate)
        templateMap("ver")="4.4.0"
        templateMap("type")="JSON"
        actionDataMap("type")=actionType
        actionDataMap("category")=category.asInstanceOf[String]
        actionDataMap("createdBy")=createdByMap;
        actionDataMap("template") =templateMap;
        actionDataMap("additionalInfo")=additionalInfo
    
    mapper.writeValueAsString(actionDataMap)
  
  })
}

卡住 表 1 有 4000 万条数据。

标签: scalaapache-sparkcassandra

解决方案


推荐阅读