pyspark - pyspark UDF 函数返回类型
问题描述
在我的火花数据框中,我有一个这里是模式
root
|-- locations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address_line_2: string (nullable = true)
| | |-- continent: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- geo: string (nullable = true)
| | |-- is_primary: boolean (nullable = true)
| | |-- last_updated: string (nullable = true)
| | |-- locality: string (nullable = true)
| | |-- most_recent: boolean (nullable = true)
| | |-- name: string (nullable = true)
| | |-- postal_code: string (nullable = true)
| | |-- region: string (nullable = true)
| | |-- street_address: string (nullable = true)
| | |-- subregion: string (nullable = true)
| | |-- type: string (nullable = true)
| | |-- zip_plus_4: string (nullable = true)
这是该位置的示例
[Row(locations=[Row(address_line_2=None, continent='north america', country='united states', geo='40.41,-74.36', is_primary=True, last_updated=None, locality='old bridge', most_recent=True, name='old bridge, new jersey, united states', postal_code=None, region='new jersey', street_address=None, subregion=None, type=None, zip_plus_4=None)])]
如您所见,有一个名为 isPrimary 的字段,基于我要选择的字段是我编写的函数
def geoLambda(locations):
"""
Pre process geo locations
:param x:
:return: dict
"""
try:
for x in locations:
if x.get("is_primary") == "True" or x.get("is_primary") == True:
data = x
data = data.get("geo", None)
if data is None:
lat,lon = -83, 135
else:
lat,lon = data.split(",")
Payload = {"lat":float(lat), "lon":float(lon)}
return Payload
else:
pass
except Exception as e:
print("EXCEPTION: {} ".format(e))
lat,lon = -83, 135
Payload = {"lat":float(lat), "lon":float(lon)}
return Payload
udfValueToCategoryGeo = udf(geoLambda, StructType())
df = df.withColumn("myloc", udfValueToCategoryGeo("locations"))
输出
|-- myloc: struct (nullable = true)
----+
| {}|
| {}|
| {}|
| {}|
| {}|
| {}|
| {}|
如果我选择类型为字符串
udfValueToCategoryGeo = udf(geoLambda, StringType())
df = df.withColumn("myloc", udfValueToCategoryGeo("locations"))
| myloc|
+--------------------+
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
|{lon=135.0, lat=-...|
我总是不知道为什么?
同样的功能在熊猫中运行良好,但我不想使用熊猫,任何帮助都会很棒
这就是单行的样子
位置 ROW
[{'name': 'princeton, new jersey, united states',
'locality': 'princeton',
'region': 'new jersey',
'subregion': None,
'country': 'united states',
'continent': 'north america',
'type': None,
'geo': '40.34,-74.65',
'postal_code': None,
'zip_plus_4': None,
'street_address': None,
'address_line_2': None,
'most_recent': True,
'is_primary': True,
'last_updated': '2021-03-01'}]
任何帮助
解决方案
这就是我解决的方法
def geoLambda(locations):
for x in locations:
if x["is_primary"] == True:
data = x["geo"]
if data is None:
lat,lon = -83, 135
else:
lat,lon = data.split(",")
Payload = {"lat":float(lat), "lon":float(lon)}
return Payload
else:
pass
udfValueToCategoryGeo = udf(geoLambda, StructType(
[
StructField('lat', nullable=True, dataType=FloatType()),
StructField('lon', nullable=True, dataType=FloatType())
]
))
df = df.withColumn("myloc", udfValueToCategoryGeo("locations"))
推荐阅读
- angular - 谷歌优化和角度错误
- node.js - Sequelize Node.JS 上的“ER_BAD_FIELD_ERROR”
- java - PDFBox 将 html 标记内的数据拆分为单独的页面。如何指定拆分页面的位置?
- html - 主页上的 CSS 隐藏标题
- javascript - text.splice(); 重新加载草图时删除并保存数组项
- php - 在 drupal 8 中,我如何为未发布的页面创建 410 页面状态
- jmeter - Jmeter摘要认证
- node.js - 存储 Oauth 访问令牌和刷新令牌
- javascript - javascript 和 css 的 CSP 问题 - 密码可见性切换
- excel - .CurrentPage PivotTable VBA Excel 的运行时错误 1004