首页 > 解决方案 > 使用 PySpark 以不同方式处理每一行的空值

问题描述

我在纬度和经度中有 NULL,我需要通过 API 搜索地址然后替换 NULL 值。

我怎么能那样做?如何遍历每一行并提取城市名称,然后将其传递给 API?

+-------------+--------------------+-------+------------+--------------------+--------+---------+
|           Id|                Name|Country|        City|             Address|Latitude|Longitude|
+-------------+--------------------+-------+------------+--------------------+--------+---------+
|  42949672960|Americana Resort ...|     US|      Dillon|         135 Main St|    null|     null|
|  60129542147|Ubaa Old Crawford...|     US| Des Plaines|     5460 N River Rd|    null|     null|
| 455266533383|        Busy B Ranch|     US|   Jefferson|  1100 W Prospect Rd|    null|     null|
|1108101562370|             Motel 6|     US|    Rockport|       106 W 11th St|    null|     null|
|1382979469315|           La Quinta|     US|  Twin Falls|    539 Pole Line Rd|    null|     null|
| 292057776132|        Hyatt Dulles|     US|     Herndon|2300 Dulles Corne...|    null|     null|
| 987842478080|      Dead Broke Inn|     US|       Young|47893 N Arizona H...|    null|     null|
| 300647710720|The Miner's Inn M...|     US|    Viburnum|Highway 49 Saint ...|    null|     null|
| 489626271746|      Alyssa's Motel|     US|       Casco|              Rr 302|    null|     null|
+-------------+--------------------+-------+------------+--------------------+--------+---------+

标签: apache-sparkpyspark

解决方案


同意 UDF 上的@SCouto,但我建议返回一个元组而不是逗号分隔的字符串。这将在以后节省两个额外的拆分转换。

def get_latitude_longitude(address):
    #call your api with address as parameter
    #concat the latitude and longitude that the api call returns and return it
    return (lat, lon)

from pyspark.sql import functions as F
from pyspark.sql import types as T

get_latitude_longitude_UDF = F.udf(get_latitude_longitude, T.ArrayType(T.DoubleType()))

(df
    .withColumn('latlon', get_latitude_longitude_UDF('Address'))
    .withColumn('lat', df['latlon'][0])
    .withColumn('lon', df['latlon'][1])
)

推荐阅读