apache-spark - 在使用另一个数据帧调用的 UDF 中过滤数据帧
问题描述
我正在尝试这样做:
- 从 2 个表中提取数据
- 使用表 1 中的字段 SECONDS,找出它在表 2 中属于哪个会话。因此,如果会话从 10:00 运行到 11:00,并且表 1 中的事务发生在 10:30,那么它将适合该会话会议。
我拥有的代码如下所示。我从表 1 中获取值并将其传递给 UDF。使用该值,我希望能够过滤其他 UDF 以返回会话号。
当我收到此错误时,这不起作用。
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o73.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
任何想法为什么会这样?
dpi_data = spark_session.sql("Select *, ((time_hour*3600) + (time_minute *60) + time_second) as seconds from table1 where hour = 04 and dt = " + yday_date )
dpi_sessions = spark_session.sql("select *, lead(seconds,1) over(partition by user order by seconds) as end_time from (select user, apn, ((time_hour*3600) + (time_minute *60) + time_second) as seconds from table2 where hour = 04 and dt = " + yday_date + ")x" )
def getsession(seconds):
output = dpi_sessions.filter((dpi_sessions.start_time <= seconds) & (dpi_sessions.end_time >= seconds))
print(output)
return 'sss'
myudf = udf(getsession, StringType())
dpi_data = dpi_data.withColumn('apn', myudf(dpi_data.seconds))
输入是: 表 1
所以在这里,我们将用户匹配到正确的会话。表 1 中的时间戳位于表 2 的开始时间和结束时间之间。
解决方案
我认为您错误地使用了UDF。您不能(一次)处理特定行中的列值并在其他地方引用另一个 DataFrame。
我相信您的问题的解决方案是您必须查看join
表格,然后检查哪个seconds
属于哪个会话(发生在 astart
和之间end
)。
让我们来看看它。
# We set up the problem
# First DataFrame
dpi_data_columns = ["user", "seconds"]
data1 = [(272927, 31924), (272927, 32000), (272927, 45000), (272927, 78000), (272927, 79000)]
dpi_data_rdd = spark.sparkContext.parallelize(data1)
# We rename user to user1 because to avoid column name duplicates post join
dpi_data = dpi_data_rdd.toDF(dpi_data_columns).withColumnRenamed("user", "user1")
dpi_data.show()
#+------+-------+
#| user1|seconds|
#+------+-------+
#|272927| 31924|
#|272927| 32000|
#|272927| 45000|
#|272927| 78000|
#|272927| 79000|
#+------+-------+
# First DataFrame
dpi_sessions_columns = ["user", "start", "end", "key"]
data2 = [(272927, 15000, 40000, "Paid"), (272927, 40001, 86000, "Unpaid")]
dpi_sessions_rdd = spark.sparkContext.parallelize(data2)
# We rename user to user2 because to avoid column name duplicates post join
dpi_sessions = dpi_sessions_rdd.toDF(dpi_sessions_columns).withColumnRenamed("user", "user2")
dpi_sessions.show()
#+------+-----+-----+------+
#| user2|start| end| key|
#+------+-----+-----+------+
#|272927|15000|40000| Paid|
#|272927|40001|86000|Unpaid|
#+------+-----+-----+------+
好的,到目前为止一切都很好。现在我们简单地加入。
from pyspark.sql.functions import col
join_condition = [dpi_data.seconds >= dpi_sessions.start, dpi_data.seconds <= dpi_sessions.end]
# We join and select target columns, renaming 'user1' or 'user2' back to 'user'
dpi_data_sessions = dpi_data.join(dpi_sessions, join_condition)\
.select(col("user1").alias("user"), col("seconds"), col("key").alias("out"))
dpi_data_sessions.show()
#+------+-------+------+
#| user|seconds| out|
#+------+-------+------+
#|272927| 31924| Paid|
#|272927| 32000| Paid|
#|272927| 45000|Unpaid|
#|272927| 78000|Unpaid|
#|272927| 79000|Unpaid|
#+------+-------+------+
这就是您的目标输出,不需要 UDF。
推荐阅读
- scala - 为什么 Akka Sharding 的“majority-min-cap”默认设置为 5?
- laravel - 如何修复'Magick:由于信号 6(SIGABRT)“中止”而中止。. 。
- c++ - 当原始数据为常量时,修改指针指向的位置是否是UB?
- google-apps-script - Google Apps 脚本中的 Google Drive 通知
- sql - 找到至少 3 盎司的食谱名称。(数量)每种成分都被使用
- email - 如何使用颤振(或飞镖)接收电子邮件?
- excel - 复制列数据而不复制空白单元格
- pandas - 按 n 个第一个值的三列组合过滤
- sql-server - SQL Server:获取日期和时间范围的行
- javascript - 用于删除 PHP 上的 JS 注释但忽略字符串中的文本的正则表达式