首页 > 解决方案 > 火花流 | 将不同的数据帧写入 Synapse DW 中的多个表

问题描述

我有多个数据帧,这些数据帧是从 azure-event-hub 中的一个 json 消息中提取的。我们希望使用 spark 流作业将这些 DF 推送到 Synapse DW 中的单独表。

这是我的架构-

root
 |-- Name: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- EmpID: string (nullable = true)
 |-- Projects: struct (nullable = true)
 |    |-- ProjectID: string (nullable = true)
 |    |-- ProjectName: string (nullable = true)
 |    |-- Duration: string (nullable = true)
 |    |-- Location: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- City: string (nullable = true)
 |    |    |    |-- State: string (nullable = true)
 |    |-- Contact: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Phone: string (nullable = true)
 |    |    |    |-- email: string (nullable = true)

我从上述模式中提取了 4 个不同的数据帧 -

  1. 项目
  2. 地点
  3. 接触
  4. 员工

它们都应该插入到 Synapse 的 4 个不同的表中

ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

请建议如何在此应用 ForeachBatch 接收器以插入表。

标签: scalaapache-sparkspark-streamingspark-structured-streamingazure-databricks

解决方案


如果您计划基于单个输入流数据帧编写四个不同的数据帧,您可以通过foreachBatch以下方式使用:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

  // as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
  batchDF.persist()

  // create the four different Dataframes based on the input
  val ProjectDf = batchDF.select(...)
  val LocationDf = batchDF.select(...) 
  val ContactDf = batchDF.select(...)
  val EmployeeDf = batchDF.select(...)

  // then you can save those four Dataframes into the desired locations
  ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

  // do not forget to unpersist your batchDF
  batchDF.unpersist()
}

这在有关使用 foreach 和 foreachBatch的文档中进行了描述

如果您遇到异常“Overloaded method foreachBatch with alternatives”,您可以查看Databricks Runtime 7.0的发行说明,其中说:

“要修复编译错误,请显式更改foreachBatch { (df, id) => myFunc(df, id) }foreachBatch(myFunc _)或使用 Java API:foreachBatch(new VoidFunction2 ...)。”

这意味着,您的代码将如下所示:

def myFunc(batchDF: DataFrame, batchId: Long): Unit = {
  // as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
  batchDF.persist()

  // create the four different Dataframes based on the input
  val ProjectDf = batchDF.select(...)
  val LocationDf = batchDF.select(...) 
  val ContactDf = batchDF.select(...)
  val EmployeeDf = batchDF.select(...)

  // then you can save those four Dataframes into the desired locations
  ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

  // do not forget to unpersist your batchDF
  batchDF.unpersist()
}


streamingDF.writeStream.foreachBatch(myFunc _).[...].start()

推荐阅读