首页 > 解决方案 > 将列从数据框(df1)添加到另一个数据框(df2)

问题描述

我需要一些关于这个 Apache Spark (pyspark) 问题的帮助。

我有一个数据帧(df1),它有单列和单行,它包含 max_timestamp

+------------------+ |max_timestamp | +-------------------+ |2019-10-24 21:18:26| +-------------------+

我有另一个 DataFrame,它包含 2 列 - EmpId 和 Timestamp

masterData = [(1, '1999-10-24 21:18:23',), (1, '2019-10-24 21:18:26',), (2, '2020-01-24 21:18:26',)]
df_masterdata = spark.createDataFrame(masterData, ['dsid', 'txnTime_str'])
df_masterdata = df_masterdata.withColumn('txnTime_ts', col('txnTime_str').cast(TimestampType())).drop('txnTime_str')

df_masterdata.show(5, False)

+----+-------------------+
|dsid|txnTime_ts         |
+----+-------------------+
|1   |1999-10-24 21:18:23|
|1   |2019-10-24 21:18:26|
|2   |2020-01-24 21:18:26|
+----+-------------------+

目的是根据条件 txnTime_ts < max_timestamp 过滤第二个 Dataframe 中的记录

我正在尝试做的 -> 将列“max_timestamp”添加到第二个 DataFrame,并通过比较两个值来过滤记录。

df_masterdata1 = df_masterdata.withColumn('maxTime', maxTS2['TEMP_MAX'])

Pyspark 不允许我将 maxTS2 中的列添加到 dataFrame - df_masterdata

错误 -

AnalysisException: 'Resolved attribute(s) TEMP_MAX#207255 missing from dsid#207263L,txnTime_ts#207267 in operator
!Project [dsid#207263L, txnTime_ts#207267, TEMP_MAX#207255 AS maxTime#207280].;;\n!Project [dsid#207263L,
txnTime_ts#207267, TEMP_MAX#207255 AS maxTime#207280]\n+- Project [dsid#207263L, txnTime_ts#207267]\n   +- Project
[dsid#207263L, txnTime_str#207264, cast(txnTime_str#207264 as timestamp) AS txnTime_ts#207267]\n      +- LogicalRDD
[dsid#207263L, txnTime_str#207264], false\n'

关于如何解决这个问题的任何想法?

标签: dataframepyspark

解决方案


如果您实际上有一个带有单行/列的 DF,那么完成此操作的最有效方法是从数据框中提取值,然后df_masterdata对其进行过滤。如果您仍然需要在数据框的上下文中执行此操作,您应该使用我们join,例如:

df_masterdata1 = df_masterdata.join(df1, df_masterdata.txnTime_ts <= df1.max_timestamp)


推荐阅读