scala - 将 Spark DataFrame 架构转换为新架构
问题描述
我有多个从不同来源读取的火花作业,它们具有不同的模式,但它们非常接近,我想要做的是将它们全部写入同一个 Redshift 表,因此我需要统一所有 DataFrame 模式,什么是最好的方法吗?
假设第一个输入数据的模式如下:
val schema1 = StructType(Seq(
StructField("date", DateType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("platform", StringType),
StructField("country", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType)
))
seconf inout 源的 Schema 如下:
val schema2 = StructType(Seq(
StructField("date", DateType),
StructField("creator_id", StringType),
StructField("creator_name", StringType),
StructField("platform", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType),
StructField("ecpm", DoubleType)
))
表模式(预期统一数据帧):
val finalSchema = StructType(Seq(
StructField("date", DateType),
StructField("account_name", StringType),
StructField("adset_id", StringType),
StructField("adset_name", StringType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("pub_id", StringType),
StructField("pub_name", StringType),
StructField("creative_id", StringType),
StructField("creative_name", StringType),
StructField("platform", StringType),
StructField("install_source", StringType),
StructField("views", IntegerType),
StructField("clicks", IntegerType),
StructField("installs", IntegerType),
StructField("cost", DoubleType)
))
正如您在最终架构中看到的那样,我有一些列可能不在输入架构中,因此它应该为空,一些列名也应该重命名。并且一些列ecpm
应该被删除。
解决方案
添加index
columns
到dataframes
和join
它们的基础上,index
所以会有一对一的映射。之后,select
只有您想要columns
的 from joined
dataframe
。
如果你有两个
dataframes
像下面// df1.show +-----+---+ | name|age| +-----+---+ |Alice| 25| | Bob| 29| | Tom| 26| +-----+---+ //df2.show +--------+-------+ | city|country| +--------+-------+ | Delhi| India| |New York| USA| | London| UK| +--------+-------+
现在添加
index
columns
并获得一对一的映射import org.apache.spark.sql.functions._ val df1Index=df1.withColumn("index1",monotonicallyIncreasingId) val df2Index=df2.withColumn("index2",monotonicallyIncreasingId) val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2")) //joinedDf +-----+---+------+--------+-------+------+ | name|age|index1| city|country|index2| +-----+---+------+--------+-------+------+ |Alice| 25| 0| Delhi| India| 0| | Bob| 29| 1|New York| USA| 1| | Tom| 26| 2| London| UK| 2| +-----+---+------+--------+-------+------+
现在您可以编写如下查询
val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show
//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25| India|
| Bob| 29| USA|
| Tom| 26| UK|
+-----+---+-------+
推荐阅读
- c - C中带有while循环的阶乘
- swift - 如何更正自定义单元格中计时器倒计时中的数字跳过?
- google-apps-script - 如何从过滤函数的输出单元格中更改源单元格内容
- handlebars.js - 更改后在 html 中设置模板
- java - 如何制作不同的按钮,不同的图像视图会受到影响
- python - 带数字的堆叠条图
- java - 如何使用运行时定义的节点名称将数组序列化为 XML?
- azure-functions - 使用 TimeTrigger Function App 访问 VM 驱动器
- git - GitLab API - 检索存储库文件
- html - 如何为多个离子网格分配不同的高度?