首页 > 解决方案 > 更改 Scala 代码以根据来自两个不同数组记录的电子邮件发送分组数据

问题描述

我有两张表,一张是 Records,一张是 Identifier:

val finalResultsRecords: Array[Records] = 
  sqlContext.sql("select * from Records").as[Records].collect()


val finalResultsIdentifier: Array[Identifier] = 
  sqlContext.sql("select * from Identifier").as[Identifier].collect()

目前,我将这些作为两封单独的电子邮件发送到各个指定的电子邮件:

finalResultsRecords.groupBy(_.email).foreach { case (email, finalResultsRecords) =>
  val HTMLTableOfRecords: String = generateReport(finalResultsRecords)
  var emailContent : String = "$HTMLTableOfRecords";
  sendEmail(s"EXAMPLE RECORDS",emailContent, List(email))
}

然后是这里的标识符结果:

finalResultsIdentifier.groupBy(_.email).foreach { case (email, finalResultsIdentifier) =>
  val HTMLTableOfIdentifiers: String = generateReport2(finalResultsIdentifier)
  var emailContent : String = "$HTMLTableOfIdentifiers";
  sendEmail(s"EXAMPLE IDENTIFIERS",emailContent, List(email))
}

我如何合并它以使其更智能,如果电子邮件匹配,则将它们作为一封电子邮件而不是两封单独的电子邮件发送给人们。例如,如果我们有 email1@email.com 并且在标识符中我们有 email1@email.com 从记录中发送电子邮件内容为

  var emailContent : String = "$HTMLTableOfIdentifiers + $HTMLTableOfRecords";

如果在记录中找到电子邮件,则将电子邮件内容发送为:

  var emailContent : String = "$HTMLTableOfRecords";

如果在标识符中找到电子邮件,则将电子邮件内容发送为:

  var emailContent : String = "$HTMLTableOfIdentifiers";

编辑:

    val finalResultsRecords: Array[Records] = sqlContext.sql("select * from Records").as[Records].collect()
    
    val finalResultsIdentifier: Array[Identifier] = sqlContext.sql("select * from Identifier").as[Identifier].collect()
        
    val together: Array[(String, Either[finalResultsRecords, finalResultsIdentifier])] = 
           finalResultsRecords.map(r => (r.email, Left(r))) ++ finalResultsIdentifier.map(r => (r.email, Right(r)))
        
    val toSend: Map[String, String] = together.groupBy(_._1).mapValues{   
           entries => {
             val parts = entries.map(e => e._2 match {
               case Left(rec) => s"Records: ${rec}"
               case Right(ident) => s"Indentifier: ${ident}"
             })
            parts.mkString(",")
           }
         }.toMap
        
toSend.foreach{
          case (address, content) =>  
    
          val HTMLTableOfRecords: String = generateReport(finalResultsRecords)
          val HTMLTableOfIdentifiers: String = generateReport2(finalResultsIdentifier)
        
          var content : String = "";
            content = s"""
            <html>
            <head>
            </head>
            <body>
            ${HTMLTableOfRecords}
            <br><br>
            ${HTMLTableOfIdentifiers}
            <br>
            </p>
            <p>
            </p>"""   
    
          sendEmail(s"EXAMPLE IDENTIFIERS",content, List(address))
         }

标签: scalaapache-sparkapache-spark-sqldatabricks

解决方案


在调用groupBy. 例如:

case class Records(email: String, stuff : String)
case class Identifier(email: String, differentStuff : String)

val records: Seq[Records] = ??? //sqlContext.sql("select * from Records").as[Records].collect()
val identifiers: Seq[Identifier] = ??? // sqlContext.sql("select * from Identifier").as[Identifier].collect()
val together: Seq[(String, Either[Records, Identifier])] = 
  records.map(r => (r.email, Left(r))) ++ identifiers.map(r => (r.email, Right(r)))
val toSend: Map[String, String] = together.groupBy(_._1).mapValues{
  //form email content
  entries => {
    val parts = entries.map(e => e._2 match {
      case Left(rec) => s"records: ${rec.stuff}"
      case Right(ident) => s"identifier: ${ident.differentStuff}"
    })
   parts.mkString(",")
  }
}.toMap

toSend.foreach{
  case (address, content) => ??? //send email
}

推荐阅读