首页 > 解决方案 > 在 PySpark UDF 中使用 StructType 列

问题描述

我正在处理的列之一具有以下架构,

 |-- time_to_resolution_remainingTime: struct (nullable = true)
 |    |-- _links: struct (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |-- completedCycles: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- breached: boolean (nullable = true)
 |    |    |    |-- elapsedTime: struct (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- millis: long (nullable = true)
 |    |    |    |-- goalDuration: struct (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- millis: long (nullable = true)
 |    |    |    |-- remainingTime: struct (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- millis: long (nullable = true)
 |    |    |    |-- startTime: struct (nullable = true)
 |    |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |    |-- jira: string (nullable = true)
 |    |    |    |-- stopTime: struct (nullable = true)
 |    |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |    |-- jira: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- ongoingCycle: struct (nullable = true)
 |    |    |-- breachTime: struct (nullable = true)
 |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |-- jira: string (nullable = true)
 |    |    |-- breached: boolean (nullable = true)
 |    |    |-- elapsedTime: struct (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- millis: long (nullable = true)
 |    |    |-- goalDuration: struct (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- millis: long (nullable = true)
 |    |    |-- paused: boolean (nullable = true)
 |    |    |-- remainingTime: struct (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- millis: long (nullable = true)
 |    |    |-- startTime: struct (nullable = true)
 |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |-- jira: string (nullable = true)
 |    |    |-- withinCalendarHours: boolean (nullable = true)

我有兴趣根据某些条件获取时间字段(例如,completedCycles[x].elapsedTime、正在进行的Cycle.remainingTime)等。我正在使用的 UDF 是:

@udf("string")
def extract_time(s, field):
  # Return ongoing cycle field
  if has_column(s, 'ongoingCycle'):
    field = 'ongoingCycle.{}'.format(field)
    return s[field]
  
  # return last element of completed cycles
  s = s.get(size(s) - 1)
  return s[field]

cl = 'time_to_resolution_remainingTime'
df = df.withColumn(cl, extract_time(cl, lit("elapsedTime.friendly"))).select(cl)
display(df)

这会导致错误:

SparkException: Job aborted due to stage failure: Task 0 in stage 549.0 failed 4 times, most recent failure: Lost task 0.3 in stage 549.0 (TID 1597, 10.155.239.76, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/types.py", line 1514, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'ongoingCycle.elapsedTime.friendly' is not in list

我显然在这里做错了什么,但我无法解决这个问题。是否可以将sUDF 中的数据框转换为 python 字典并对其执行计算?还是有更好的方法来做到这一点?

编辑:

样本数据

{
   "_links":{
      "self":"https:///...."
   },
   "completedCycles":[
      
   ],
   "id":"630",
   "name":"Time to resolution",
   "ongoingCycle":{
      "breachTime":{
         "epochMillis":1605583651354,
         "friendly":"17/Nov/20 3:27 PM +12:00",
         "iso8601":"2020-11-17T15:27:31+1200",
         "jira":"2020-11-17T15:27:31.354+1200"
      },
      "breached":true,
      "elapsedTime":{
         "friendly":"57h 32m",
         "millis":207148646
      },
      "goalDuration":{
         "friendly":"4h",
         "millis":14400000
      },
      "paused":false,
      "remainingTime":{
         "friendly":"-53h 32m",
         "millis":-192748646
      },
      "startTime":{
         "epochMillis":1605511651354,
         "friendly":"16/Nov/20 7:27 PM +12:00",
         "iso8601":"2020-11-16T19:27:31+1200",
         "jira":"2020-11-16T19:27:31.354+1200"
      },
      "withinCalendarHours":false
   }
}

Expected output: -53h 23m

有已完成的周期但没有正在进行的周期

{
   "_links":{
      "self":"https://...."
   },
   "completedCycles":[
      {
         "breached":true,
         "elapsedTime":{
            "friendly":"72h 43m",
            "millis":261818073
         },
         "goalDuration":{
            "friendly":"4h",
            "millis":14400000
         },
         "remainingTime":{
            "friendly":"-68h 43m",
            "millis":-247418073
         },
         "startTime":{
            "epochMillis":1605156449463,
            "friendly":"12/Nov/20 4:47 PM +12:00",
            "iso8601":"2020-11-12T16:47:29+1200",
            "jira":"2020-11-12T16:47:29.463+1200"
         },
         "stopTime":{
            "epochMillis":1606282267536,
            "friendly":"Today 5:31 PM +12:00",
            "iso8601":"2020-11-25T17:31:07+1200",
            "jira":"2020-11-25T17:31:07.536+1200"
         }
      }
   ],
   "id":"630",
   "name":"Time to resolution",
   "ongoingCycle": null
}

Expected output: -68h 43m

我得到了这段代码,但不确定它是否是解决这个问题的最佳方法,

@udf("string")
def extract_time(s, field):
  if s is None:
    return None
  
  # Return ongoing cycle field
  if has_column(s, 'ongoingCycle'):
    if s['ongoingCycle'] is not None:
      return s['ongoingCycle']['remainingTime']['friendly']
    
  # Get the last completed cycles' remaining time
  s_completed = s['completedCycles']
  if len(s_completed) > 0:
    return s_completed[-1]['remainingTime']['friendly']
  return None

标签: apache-sparkpysparkuser-defined-functions

解决方案


使用when函数来检查与您在 中实现的逻辑相同的逻辑UDF

检查下面的代码。

df.show()
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_links         |completedCycles                                                                                                                                                                                                                                                         |id |name              |ongoingCycle                                                                                                                                                                                                                                                                            |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[https:///....]|[]                                                                                                                                                                                                                                                                      |630|Time to resolution|[[1605583651354, 17/Nov/20 3:27 PM +12:00, 2020-11-17T15:27:31+1200, 2020-11-17T15:27:31.354+1200], true, [57h 32m, 207148646], [4h, 14400000], false, [-53h 32m, -192748646], [1605511651354, 16/Nov/20 7:27 PM +12:00, 2020-11-16T19:27:31+1200, 2020-11-16T19:27:31.354+1200], false]|
|[https://....] |[[true, [72h 43m, 261818073], [4h, 14400000], [-68h 43m, -247418073], [1605156449463, 12/Nov/20 4:47 PM +12:00, 2020-11-12T16:47:29+1200, 2020-11-12T16:47:29.463+1200], [1606282267536, Today 5:31 PM +12:00, 2020-11-25T17:31:07+1200, 2020-11-25T17:31:07.536+1200]]]|630|Time to resolution|null                                                                                                                                                                                                                                                                                    |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


df.withColumn("time_to_resolution_remainingTime",F.expr("CASE WHEN ongoingCycle IS NOT NULL THEN ongoingCycle.elapsedTime.friendly WHEN size(completedCycles) > 0 THEN completedCycles[size(completedCycles)-1].remainingTime.friendly ELSE null END"))\ 
.select("time_to_resolution_remainingTime")\ 
.show(false)

+--------------------------------+
|time_to_resolution_remainingTime|
+--------------------------------+
|57h 32m                         |
|-68h 43m                        |
+--------------------------------+


推荐阅读