python - pyspark 内连接的替代方法来比较 pyspark 中的两个数据帧
问题描述
我在 pyspark 中有两个数据框。如下所示,df1 包含来自传感器的整个 long_lat。第二个数据帧 df2 是第一个数据帧的子集,其中 lat-long 值四舍五入到小数点后 2,然后删除重复项以保留唯一的 lat_long 数据点。
df1:
+-----------------+---------+-----+--------------------+----------+------------+
| UID| label|value| datetime| latitude| longitude|
+-----------------+---------+-----+--------------------+----------+------------+
|1B0545GD6546Y|evnt | 3644|2020-06-08T23:32:...|40.1172005|-105.0823546|
|1B0545GD6FG67|evnt | 3644|2020-06-08T23:32:...|40.1172201|-105.0821007|
|15GD6546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1172396|-105.0818468|
|1BGD6546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1172613|-105.0815929|
|1BGD6546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1172808|-105.0813368|
|1B054546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1173003|-105.0810742|
|1B056546YFG67|evnt | 3644|2020-06-08T23:32:...| 40.117322|-105.0808073|
df2:
+-------+--------+----------------+--------------+
|new_lat|new_long| lat_long| State_name|
+-------+--------+----------------+--------------+
| 40.13| -105.1|[40.13, -105.1] | Colorado|
| 40.15| -105.11|[40.15, -105.11]| Colorado|
| 40.12| -105.07|[40.12, -105.07]| Colorado|
| 40.13| -104.99|[40.13, -104.99]| Colorado|
| 40.15| -105.09|[40.15, -105.09]| Colorado|
| 40.15| -105.13|[40.15, -105.13]| Colorado|
| 40.12| -104.94|[40.12, -104.94]| Colorado|
因此,df2 的行数比第一个少得多。在 df2 中,我应用了一个 udf 来计算州名。
现在我想在 df1 中填充状态名称。由于 df2 的 lat_long 值被四舍五入到小数点后 2,为了匹配我使用如下阈值,我在这里使用连接操作。
threshold = 0.01
df4 = df1.join(df2)\
.filter(df2.new_lat - threshold < df1.latitude)\
.filter(df1.latitude < df2.new_lat + threshold)
有没有其他有效的方法来实现同样的目标?因为连接操作是做笛卡尔积,它需要时间和大量的任务。
考虑一下,我的 df1 将有 10000 亿条记录。
任何,帮助将不胜感激。
解决方案
每当您将大 DataFrame 与较小的 DataFrame 连接时,您应该始终尝试执行广播连接。
如果df2
小到可以广播,那么df1.join(broadcast(df2))
性能会更高。
该join()
方法的第二个参数应该是连接条件。
def approx_equal(col1, col2, threshold):
return abs(col1 - col2) < threshold
threshold = lit(0.01)
df4 = df1.join(broadcast(df2), approx_equal(df2.new_lat, df1.latitude, threshold) && approx_equal(df2.new_long, df1. longitude, threshold))
编辑:我将approx_equal
函数添加到quinn,因此您的代码可以更简洁:
import quinn as Q
threshold = lit(0.01)
df4 = df1.join(broadcast(df2), Q.approx_equal(df2.new_lat, df1.latitude, threshold) && Q.approx_equal(df2.new_long, df1. longitude, threshold))
推荐阅读
- r - 使用 purrr 时如何定位错误和调试
- html - 带有长文本的 CSS 气泡对话丢失格式
- bash - 如何从脚本中调用脚本
- angular - Angular 谷歌地图不适用于路由器插座
- mongodb - 使用jmeter根据条件从mongo数据库中删除记录
- python - 键盘快捷键使 tkinter 条目小部件跳转到前面
- angular - 带有自定义 --base-href 和 Nginx 路由的 Angular SPA
- python-3.x - 如何显示从 A 点到 D 点的路线,避免多边形而不与它们相交?
- jenkins - Jenkins 配置为代码插件 (JCasC) 的参数化构建语法
- bokeh - 散景垂直对齐控件