apache-spark - 使用 PySpark 以不同方式处理每一行的空值
问题描述
我在纬度和经度中有 NULL,我需要通过 API 搜索地址然后替换 NULL 值。
我怎么能那样做?如何遍历每一行并提取城市名称,然后将其传递给 API?
+-------------+--------------------+-------+------------+--------------------+--------+---------+
| Id| Name|Country| City| Address|Latitude|Longitude|
+-------------+--------------------+-------+------------+--------------------+--------+---------+
| 42949672960|Americana Resort ...| US| Dillon| 135 Main St| null| null|
| 60129542147|Ubaa Old Crawford...| US| Des Plaines| 5460 N River Rd| null| null|
| 455266533383| Busy B Ranch| US| Jefferson| 1100 W Prospect Rd| null| null|
|1108101562370| Motel 6| US| Rockport| 106 W 11th St| null| null|
|1382979469315| La Quinta| US| Twin Falls| 539 Pole Line Rd| null| null|
| 292057776132| Hyatt Dulles| US| Herndon|2300 Dulles Corne...| null| null|
| 987842478080| Dead Broke Inn| US| Young|47893 N Arizona H...| null| null|
| 300647710720|The Miner's Inn M...| US| Viburnum|Highway 49 Saint ...| null| null|
| 489626271746| Alyssa's Motel| US| Casco| Rr 302| null| null|
+-------------+--------------------+-------+------------+--------------------+--------+---------+
解决方案
同意 UDF 上的@SCouto,但我建议返回一个元组而不是逗号分隔的字符串。这将在以后节省两个额外的拆分转换。
def get_latitude_longitude(address):
#call your api with address as parameter
#concat the latitude and longitude that the api call returns and return it
return (lat, lon)
from pyspark.sql import functions as F
from pyspark.sql import types as T
get_latitude_longitude_UDF = F.udf(get_latitude_longitude, T.ArrayType(T.DoubleType()))
(df
.withColumn('latlon', get_latitude_longitude_UDF('Address'))
.withColumn('lat', df['latlon'][0])
.withColumn('lon', df['latlon'][1])
)