scala - 如何减少 AWS Deequ 上的代码重复
问题描述
我有一些 5 个数据集(将来会增长,因此泛化很重要),它们调用具有共同标题的相同代码库,但我不知道如何去确保
- 加载数据集
- 调用代码并写入不同的文件夹。如果您能提供帮助,那就太好了,因为我是 Scala 的新手。论文是 AWS Glue 上的作业。唯一改变的是输入文件和结果的位置。
例如,这里有一些三个示例 - 我想减少代码的重复:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates
/Contract_Portfolio_Assignment/Contract_Portfolio_Assignement_Compass/contract-portfolio-
assignment-compass - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignment-Compass/")
}}
这是第二个代码库:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Contract-Portfolio-
Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignement-GIP-Validations/")
}}
这是第三个:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-
ire-fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Portfolio-
Assignment-Mobilife/Mobilife-Portforlio-Assessment - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/contract-portfolio-
assessment_Mobilife-Validations/")
}}
解决方案
根据我对您的问题的理解,您可以创建执行通用逻辑的函数,并且可以从不同的地方调用相同的函数。根据不同工作流程的不同值,您可以为函数设置多个参数。
//Common Function just for your reference but you can modify it as you want.
object CommonHelper {
def ProcessDataSet(spark : SparkSession,sourcePath : String , targetPath : String) : Unit = {
val dataset = spark.read.option("header",true).option("delimiter",",").csv(sourcePath)
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet(targetPath)
}
}
现在您可以从启动对象中的主函数调用它,如下所示。我已经为一个数据集展示了它,您可以将它重用于其他数据集。
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val sourcePath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/templates/Contract_Portfolio_Assignment/Contract-Portfolio-Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv"
val targetPath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-Assignement-GIP-Validations/"
CommonHelper.ProcessDataSet(spark,sourcePath1,targetPath1) //you can call this function from multiple places based on how you want to use that.
}}
推荐阅读
- ios - 读取二维码崩溃 iOS(Objective C) 应用程序
- c# - 如何使用类Task c#将矩阵和向量相乘
- jquery - JQuery 3.5.0 使用 html 产生奇数输出
- r - 使用现有的绘图创建闪亮的下拉菜单
- excel - Changing properties of multiple controls in VBA userform permanently
- java - 如何在java中处理这种打印?
- javascript - 错误 message.guild.channels.filter 不是函数,使用 Discord.JS
- mongodb - 在`mongodb` Rust Lang 中找不到`options`
- asp.net-mvc - 我的引导表没有显示任何内容,甚至列
- ios - 在真实设备上运行 appium 测试时出现错误:xcodebuild failure: xcodebuild failed with code 65