scala - Scala Spark结构化流过滤器按Struct字段中的TimestampType
问题描述
我在定义的架构中有多个数据类型。试图找到一种按 TimestampType 过滤的好方法,将所有 TimeStampType 字段从 long 转换为 datetime。我可以在 StringType 的流中使用 .dtypes 进行归档,但在尝试使用 StructFields 和 StructTypes 的 .dtypes 进行过滤时遇到问题。有没有办法只过滤结构中的时间戳类型?下面是我使用的 sudo 代码,它在 Scala 2.11 中使用 spark 结构化流
val isoDateFormatter = "yyyy-MM-dd'T'HH:mm:ss'Z'"
val ExampleDataFrameLoad = spark
.readStream
.format("kafka")
.option("subscribe", topics.keys.mkString(","))
.options(kafkaConfig)
.load()
.select($"key".cast(StringType), $"value".cast(StringType), $"topic")
// Convert untyped dataframe to dataset
.as[(String, String, String)]
// Merge all manifests for vehicle in minibatch
.groupByKey(_._1)
//Start of merge
.flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.ProcessingTimeTimeout)(mergeGroup)
// .select($"key".cast(StringType),from_json($"value",schema).as("manifest"))
.select($"_1".alias("key"), $"_2".alias("jsonvalues"))
.select("key", "jsonvalues.*")
val ExampleDataFrame = ExampleDataFrameLoad
ExampleDataFrame.dtypes.foreach(println)
/* Returns
(key,StringType)
(contractVersion,StringType)
(metaData,StructType(StructField(Test,StringType,true), StructField(DateUtc,TimestampType,true)
*/
*Uses the following objects
import java.sql.Timestamp
object ManifestClasses {
final case class ProductManifestDocument(
contractVersion: Option[String],
metaData: DocumentMetaData
)
final case class DocumentMetaData(
Test: Option[String]
DateUtc: Timestamp
)
*/
ExampleDataFrame
//brings back data fields with types
.dtypes
//Currently returning empty but works for StringType
.filter(_._2 == "TimestampType")
.map(_._1)
//Tranforms all timestamp longs to yyyy-MM-dd'T'HH:mm:ss'Z' format
.foldLeft(ExampleDataFrame)((df, colName) => df.withColumn(colName, date_format(col(colName), isoDateFormatter)))
解决方案
您可以转换 structType 如下 -
就地更改时间戳类型的日期格式
加载测试数据
val ExampleDataFrame = spark.sql("select key, contractVersion, metaData from values " +
"('k1', 'v1', named_struct('Test', 'test1', 'DateUtc', cast(unix_timestamp() as timestamp))) " +
"T(key, contractVersion, metaData)")
ExampleDataFrame.show(false)
ExampleDataFrame.printSchema()
ExampleDataFrame.dtypes.foreach(println)
/**
* +---+---------------+----------------------------+
* |key|contractVersion|metaData |
* +---+---------------+----------------------------+
* |k1 |v1 |[test1, 2020-06-23 14:39:55]|
* +---+---------------+----------------------------+
*
* root
* |-- key: string (nullable = false)
* |-- contractVersion: string (nullable = false)
* |-- metaData: struct (nullable = false)
* | |-- Test: string (nullable = false)
* | |-- DateUtc: timestamp (nullable = true)
*
* (key,StringType)
* (contractVersion,StringType)
* (metaData,StructType(StructField(Test,StringType,false), StructField(DateUtc,TimestampType,true)))
*/
将时间戳从结构转换为特定日期格式
val isoDateFormatter = "yyyy-MM-dd'T'HH:mm:ss'Z'"
val processedDF = ExampleDataFrame.withColumn("metaData", struct($"metaData.Test",
date_format($"metaData.DateUtc", isoDateFormatter)))
processedDF.show(false)
/**
* +---+---------------+-----------------------------+
* |key|contractVersion|metaData |
* +---+---------------+-----------------------------+
* |k1 |v1 |[test1, 2020-06-23T14:51:17Z]|
* +---+---------------+-----------------------------+
*/
Update-1(基于评论)
timestamptype
从 structType 中提取作为单独的列
ExampleDataFrame.schema
.filter(_.dataType.isInstanceOf[StructType])
.flatMap(s => s.dataType.asInstanceOf[StructType]
.filter(_.dataType == TimestampType)
.map(f => s"${s.name}.${f.name}")
)
.foldLeft(ExampleDataFrame)((df, colName) => df.withColumn(colName, date_format(col(colName), isoDateFormatter)))
.show(false)
/**
* +---+---------------+----------------------------+--------------------+
* |key|contractVersion|metaData |metaData.DateUtc |
* +---+---------------+----------------------------+--------------------+
* |k1 |v1 |[test1, 2020-06-23 21:40:36]|2020-06-23T21:40:36Z|
* +---+---------------+----------------------------+--------------------+
*/
// use df.select("`metaData.DateUtc`") to select the columns having dot(.)
推荐阅读
- android - Android BottomNavigationView 部分隐藏 RecyclerView 中的最后一项
- javascript - 当 jscript 确认并通过 AJAX 调用 PHP 时如何更新 $_SESSION 值
- firebase - Firebase Firestore:将新数组合并到存储在文档中的数组中?
- html - 如何在我已将 grid-auto-flow 设置为 column 的情况下使用 CSS grid 给出多个列
- sql-server - 密码不得包含超过 SQL Server 中三个连续字符的用户帐户名或用户全名的一部分
- optaplanner - scoreManager.explainScore(解决方案)在 Optaplanner 中不起作用
- angular - 在 RxJS 可观察对象的订阅者完成后执行函数
- quarkus - 如何在 Quarkus 中以编程方式覆盖应用程序属性
- java - 通过 apache camel 连接到 SFTP
- asp.net - 从 Web 表单中检索记录列表