scala - 从数据集中的地图按键排序
问题描述
我想按时间戳订购从 HDFS 检索的一些 avro 文件。
我的 avro 文件的架构是:
标题:地图[字符串,字符串],正文:字符串
现在棘手的部分是时间戳是地图中的键/值之一。所以我在地图中包含了这样的时间戳:
key_1 -> value_1,key_2 -> value_2,时间戳 -> 1234567,key_n -> value_n
请注意,值的类型是字符串。
我创建了一个案例类来使用此架构创建我的数据集:
case class Root(headers : Map[String,String], body: String)
创建我的数据集:
val ds = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
我真的不知道如何从这个问题开始,因为我只能获取列标题和正文。如何让嵌套值最终按时间戳排序?
我想做这样的事情:
ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")
有点精确:我不想从我的初始数据集中丢失任何数据,只是一个排序操作。
我使用 Spark 2.3.0。
解决方案
您可以使用 Scala 的 sortBy,它带有一个函数。我建议您将 val ds 明确声明为 Vector(或其他集合),这样您将在 IntelliJ 中看到适用的函数(如果您使用的是 IntelliJ)并且它肯定会编译。
根据您的代码,请参阅下面的示例:
case class Root(headers : Map[String,String], body: String)
val ds: Vector[Root] = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse
编辑:添加反向(假设您希望它降序)。在作为参数传递的函数内部,您还将处理时间戳。
推荐阅读
- c - SDL2和SDL2_ttf的静态链接
- mysql - 我将如何在 MySQL 中使用“数组”?
- python - 如何在使用 multiproccesing 的同时将数据添加到 json 文件中?
- python - jupyter笔记本没有运行
- java - 如何从 Android JAVA 或 Kotlin 的直接 Firebase 存储中获取完整 URL(如下所示)
- javascript - 在内容脚本中的事件中传递消息。铬扩展
- html - css table - 如何根据特定列对齐中心
- python-3.x - Pandas:如何根据另一个数据框的值对数据框上的列求和
- docker - Dockerized Hyperledger Caliper 退出,代码为 0
- assembly - arm-linux-gnueabi-gcc 错误指令 strhlo,strhhs Cortex-A9