scala - 在 Spark 中添加新记录
问题描述
我有一个数据框:
| ID | TIMESTAMP | VALUE |
1 15:00:01 3
1 17:04:02 2
当值为 2 时,我想在相同的时间减去 1 秒之前使用 Spark-Scala 添加一条新记录。
输出将是:
| ID | TIMESTAMP | VALUE |
1 15:00:01 3
1 17:04:01 2
1 17:04:02 2
谢谢
解决方案
您可以引入一个新的列数组 - 当 value = 2 然后 Array(-1,0) 否则 Array(0) 时,然后分解该列并将时间戳添加为秒。下面的一个应该适合你。看一下这个:
scala> val df = Seq((1,"15:00:01",3),(1,"17:04:02",2)).toDF("id","timestamp","value")
df: org.apache.spark.sql.DataFrame = [id: int, timestamp: string ... 1 more field]
scala> val df2 = df.withColumn("timestamp",'timestamp.cast("timestamp"))
df2: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 1 more field]
scala> df2.show(false)
+---+-------------------+-----+
|id |timestamp |value|
+---+-------------------+-----+
|1 |2019-03-04 15:00:01|3 |
|1 |2019-03-04 17:04:02|2 |
+---+-------------------+-----+
scala> val df3 = df2.withColumn("newc", when($"value"===lit(2),lit(Array(-1,0))).otherwise(lit(Array(0))))
df3: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 2 more fields]
scala> df3.show(false)
+---+-------------------+-----+-------+
|id |timestamp |value|newc |
+---+-------------------+-----+-------+
|1 |2019-03-04 15:00:01|3 |[0] |
|1 |2019-03-04 17:04:02|2 |[-1, 0]|
+---+-------------------+-----+-------+
scala> val df4 = df3.withColumn("c_explode",explode('newc)).withColumn("timestamp2",to_timestamp(unix_timestamp('timestamp)+'c_explode))
df4: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 4 more fields]
scala> df4.select($"id",$"timestamp2",$"value").show(false)
+---+-------------------+-----+
|id |timestamp2 |value|
+---+-------------------+-----+
|1 |2019-03-04 15:00:01|3 |
|1 |2019-03-04 17:04:01|2 |
|1 |2019-03-04 17:04:02|2 |
+---+-------------------+-----+
scala>
如果你想要单独的时间部分,那么你可以这样做
scala> df4.withColumn("timestamp",from_unixtime(unix_timestamp('timestamp2),"HH:mm:ss")).select($"id",$"timestamp",$"value").show(false)
+---+---------+-----+
|id |timestamp|value|
+---+---------+-----+
|1 |15:00:01 |3 |
|1 |17:04:01 |2 |
|1 |17:04:02 |2 |
+---+---------+-----+
推荐阅读
- angular - 用于在 Spring Security 中登录的弹簧控制器
- java - 在实现的应用程序/模块中提供依赖项
- java - Itext通过来自客户端的base 64中的字符串签名签署pdf
- php - 如何访问 vagrant 中设置的虚拟主机
- python - 有没有办法加快递归加载雅虎财务数据并将其与使用 pandas 的选定数据进行比较,尤其是 100,000 只股票
- javascript - 更改焦点上文本框的输入字体语言
- mysql - “IN/ALL/ANY 子查询”中的未知列
- html - 如何选择除某些元素的子树中的元素之外的所有元素?
- java - 在Java中排序时如何考虑最后的空间
- python - 如何从Python中通过“stdin”输入的字符串中删除子字符串