pyspark - 如何使用 UDF 在 PySpark 中合并多边形
问题描述
我使用的多边形数据以 GeoJSON 类型存储在列表中。我想在列表中合并多边形。
示例数据:
ID | 多边形 | 计算多边形 |
---|---|---|
1 | [{“类型”:“多边形”,“坐标”:[[[129.1012158,35.2478918],......}] | 3 |
2 | [{“类型”:“多边形”,“坐标”:[[[129.1012158,35.2478918],......}] | 2 |
预期数据:
ID | union_polygons |
---|---|
1 | {“类型”:“多边形”,“坐标”:[[[129.1012158,35.2478918],......}] |
2 | {“类型”:“多边形”,“坐标”:[[[129.1012158,35.2478918],......}] |
代码:
from shapely.ops import cascaded_union, unary_union
from shapely.geometry import shape, Point, Polygon, mapping
from json import load, JSONEncoder
import shapely
@F.udf(StringType())
def union_polygons(polygon_column):
a = unary_union([shape(json.loads(p)) for p in polygons])
b = json.dumps(shapely.geometry.mapping(a))
return b
df.withColumn('union_polygon', union_polygons('polygons')).show()
使用上面的代码进行测试时,会出现以下错误:
Could not serialize object: TypeError: can't pickle LGEOS360 objects
...
...
pickle.PicklingError: Could not serialize object: TypeError: can't pickle LGEOS360 objects
我想通过UDF合并列表列中包含的GeoJSON。我怎样才能做到这一点?
解决方案
udf 的定义有一个小错误:在计算时,a
您应该遍历由返回的列表json.loads(p)
:
@F.udf(StringType())
def union_polygons(polygon_column):
a = shapely.ops.unary_union([shape(p) for p in json.loads(polygon_column)])
b = json.dumps(shapely.geometry.mapping(a))
return b
推荐阅读
- flutter - 颤振:如果在列表类上
- python - 保存后如何组合matplotlib图形?
- uitabbarcontroller - 如何正确呈现标签栏?对 tabbarcontroltest.ViewController 的开始/结束外观转换的不平衡调用:
- opencv - 流与 OpenCV 不同步
- javascript - 使用 jQuery 上传带有数据的文件
- javascript - 在传单地图上显示 json 数据
- java - 外部进程能否强制 JVM 抛出“java.lang.OutOfMemoryError:GC 开销限制超出”
- continuous-integration - 本地地形开发:计划但不适用?
- javascript - 如何使用 React 获取局域网中的计算机名称和本地 IP 地址
- python - 在 Twitter 情感分析上用 Python 中的 LIME 文本解释器解释我的深度学习模型