首页 > 解决方案 > 如何减少 AWS Deequ 上的代码重复

问题描述

我有一些 5 个数据集(将来会增长,因此泛化很重要),它们调用具有共同标题的相同代码库,但我不知道如何去确保

  1. 加载数据集
  2. 调用代码并写入不同的文件夹。如果您能提供帮助,那就太好了,因为我是 Scala 的新手。论文是 AWS Glue 上的作业。唯一改变的是输入文件和结果的位置。

例如,这里有一些三个示例 - 我想减少代码的重复:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession




object Deequ {






def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()

 val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire- 
 fin-stg-data-dev-raw-gib/templates 
 /Contract_Portfolio_Assignment/Contract_Portfolio_Assignement_Compass/contract-portfolio- 
 assignment-compass - Sheet1.csv") 


 val verificationResult: VerificationResult = { VerificationSuite()
 // data to run the verification on
 .onData(dataset)
 // define a data quality check
  .addCheck(
  Check(CheckLevel.Error, "Template Validations") 



  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")


  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  
  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  
  
  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  
  
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  
  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
 // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
 

 )
  


  // compute metrics and verify check conditions
  .run()
    }
    //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
    val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
    resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw- 
    gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio- 
    Assignment-Compass/")
     }}

这是第二个代码库:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession




object Deequ {






 def main(args: Array[String]) {
  val conf = new SparkConf().setAppName("dq")
  val spark = SparkSession.builder().appName("dq").getOrCreate()

  val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire- 
  fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Contract-Portfolio- 
  Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv") 


  val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
 .onData(dataset)
 // define a data quality check
 .addCheck(
   Check(CheckLevel.Error, "Template Validations") 



  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")


  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  
  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  
  
  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  
  
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  
  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
 // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
 

  )
  


   // compute metrics and verify check conditions
   .run()
     }
  //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
  val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
   resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw- 
    gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio- 
     Assignement-GIP-Validations/")
      }}

这是第三个:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession




 object Deequ {






    def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("dq")
      val spark = SparkSession.builder().appName("dq").getOrCreate()

      val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct- 
      ire-fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Portfolio- 
      Assignment-Mobilife/Mobilife-Portforlio-Assessment - Sheet1.csv") 


      val verificationResult: VerificationResult = { VerificationSuite()
       // data to run the verification on
    .onData(dataset)
    // define a data quality check
    .addCheck(
       Check(CheckLevel.Error, "Template Validations") 



  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")


  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  
  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  
  
  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  
  
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  
  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
   // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
 

    )
  


     // compute metrics and verify check conditions
    .run()
      }
   //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
    val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
    resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw- 
    gib/template_validations/Contract-Portfolio-Assignment-Validations/contract-portfolio- 
     assessment_Mobilife-Validations/")
      }}

标签: scalaaws-glue

解决方案


根据我对您的问题的理解,您可以创建执行通用逻辑的函数,并且可以从不同的地方调用相同的函数。根据不同工作流程的不同值,您可以为函数设置多个参数。

//Common Function just for your reference but you can modify it as you want.
object CommonHelper {
  def ProcessDataSet(spark : SparkSession,sourcePath : String , targetPath : String) : Unit = {
    
    val dataset = spark.read.option("header",true).option("delimiter",",").csv(sourcePath) 
  val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
 .onData(dataset)
 // define a data quality check
 .addCheck(
   Check(CheckLevel.Error, "Template Validations") 

  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")

  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
 // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
  )
   // compute metrics and verify check conditions
   .run()
     }
  //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
  val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
   resultDataFrame.write.mode("overwrite").parquet(targetPath)
  }
}

现在您可以从启动对象中的主函数调用它,如下所示。我已经为一个数据集展示了它,您可以将它重用于其他数据集。

object Deequ {
 def main(args: Array[String]) {
  val conf = new SparkConf().setAppName("dq")
  val spark = SparkSession.builder().appName("dq").getOrCreate()
val sourcePath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/templates/Contract_Portfolio_Assignment/Contract-Portfolio-Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv"
   val targetPath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-Assignement-GIP-Validations/"
  CommonHelper.ProcessDataSet(spark,sourcePath1,targetPath1) //you can call this function from multiple places based on how you want to use that.
      }}

推荐阅读