r - 使用 spark_apply 计算纬度/经度之间的距离
问题描述
我试图使用该spark_apply
函数来计算 R 中某些经度和纬度坐标之间的一些距离。我可以在基数 R 中计算它们,但我想使用该spark_apply()
函数应用相同的计算。
如何复制函数distm(latLong, distanceFrom)
内部的计算spark_apply
?
数据:
library(data.table)
library(sparklyr)
library(geosphere)
library(tidyverse)
# setup
conf <- spark_config()
conf$spark.dynamicAllocation.enabled <- "true"
sc <- spark_connect(master = "local", version = "2.3.0")
# create data
df <- data_frame(
place=c("Finland", "Canada", "Tanzania", "Bolivia", "France"),
longitude=c(27.472918, -90.476303, 34.679950, -65.691146, 4.533465),
latitude=c(63.293001, 54.239631, -2.855123, -13.795272, 48.603949),
crs="+proj=longlat +datum=WGS84")
# compute distance from the "distanceFrom" data
latLong <- df %>%
dplyr::select(c(longitude, latitude))
distanceFrom <- rbind(c(34.20, -3.67), c(30.56, -2.50))
distm(latLong, distanceFrom)
######################### Apply this in Spark
mySpark <- sdf_copy_to(sc, df, "my_tbl", overwrite = TRUE)
解决方案
由于sparklyr::spark_apply
在一个 spark 数据帧上工作,一种策略是通过“crossjoin”将所有数据放到一个 spark 数据帧上。然后,可以用 计算距离geodist::geodist
。
library("data.table")
library("sparklyr")
#>
#> Attaching package: 'sparklyr'
#> The following object is masked from 'package:stats':
#>
#> filter
library("geosphere")
library("tidyverse")
# setup
conf <- spark_config()
conf$spark.dynamicAllocation.enabled <- "true"
sc <- spark_connect(master = "local")
# create data
df <- data_frame(
place=c("Finland", "Canada", "Tanzania", "Bolivia", "France"),
longitude=c(27.472918, -90.476303, 34.679950, -65.691146, 4.533465),
latitude=c(63.293001, 54.239631, -2.855123, -13.795272, 48.603949),
crs="+proj=longlat +datum=WGS84")
#> Warning: `data_frame()` was deprecated in tibble 1.1.0.
#> Please use `tibble()` instead.
df
#> # A tibble: 5 x 4
#> place longitude latitude crs
#> <chr> <dbl> <dbl> <chr>
#> 1 Finland 27.5 63.3 +proj=longlat +datum=WGS84
#> 2 Canada -90.5 54.2 +proj=longlat +datum=WGS84
#> 3 Tanzania 34.7 -2.86 +proj=longlat +datum=WGS84
#> 4 Bolivia -65.7 -13.8 +proj=longlat +datum=WGS84
#> 5 France 4.53 48.6 +proj=longlat +datum=WGS84
# compute distance from the "distanceFrom" data
latLong <- df %>%
dplyr::select(c(longitude, latitude))
distanceFrom <- rbind(c(34.20, -3.67), c(30.56, -2.50))
distm(latLong, distanceFrom)
#> [,1] [,2]
#> [1,] 7448355.4 7302060.8
#> [2,] 12520695.4 12197620.9
#> [3,] 104712.2 459812.3
#> [4,] 10987001.5 10626916.8
#> [5,] 6466454.9 6196687.9
# create df_1 from df (5 row dataframe)
df_1 = df %>%
select(longitude, latitude)
# create df_2 from 'distanceFrom' (2 row matrix)
df_2 = as_tibble(distanceFrom)
#> Warning: The `x` argument of `as_tibble.matrix()` must have unique column names if `.name_repair` is omitted as of tibble 2.0.0.
#> Using compatibility `.name_repair`.
colnames(df_2) = c("longitude_2", "latitude_2")
df_2
#> # A tibble: 2 x 2
#> longitude_2 latitude_2
#> <dbl> <dbl>
#> 1 34.2 -3.67
#> 2 30.6 -2.5
# copy both of them to spark
df_1_sdf = df_1 %>%
copy_to(sc, ., overwrite = TRUE)
df_1_sdf
#> # Source: spark<?> [?? x 2]
#> longitude latitude
#> <dbl> <dbl>
#> 1 27.5 63.3
#> 2 -90.5 54.2
#> 3 34.7 -2.86
#> 4 -65.7 -13.8
#> 5 4.53 48.6
df_2_sdf = df_2 %>%
copy_to(sc, ., overwrite = TRUE)
df_2_sdf
#> # Source: spark<?> [?? x 2]
#> longitude_2 latitude_2
#> <dbl> <dbl>
#> 1 34.2 -3.67
#> 2 30.6 -2.5
# define distance function using geodist package
get_geodesic_distance = function(x){
dist_vec =
geodist::geodist(dplyr::select(x, c(latitude, longitude))
, dplyr::select(x, c(latitude_2, longitude_2))
, paired = TRUE
, measure = "geodesic"
)
res = dplyr::mutate(x, distance = dist_vec)
res
}
# create all pairs of points
full_join(df_1_sdf, df_2_sdf, by = character(0)) %>%
glimpse() %>%
spark_apply(get_geodesic_distance)
#> Rows: ??
#> Columns: 4
#> Database: spark_connection
#> $ longitude <dbl> 27.472918, 27.472918, -90.476303, -90.476303, 34.679950, 3…
#> $ latitude <dbl> 63.293001, 63.293001, 54.239631, 54.239631, -2.855123, -2.…
#> $ longitude_2 <dbl> 34.20, 30.56, 34.20, 30.56, 34.20, 30.56, 34.20, 30.56, 34…
#> $ latitude_2 <dbl> -3.67, -2.50, -3.67, -2.50, -3.67, -2.50, -3.67, -2.50, -3…
#> # Source: spark<?> [?? x 5]
#> longitude latitude longitude_2 latitude_2 distance
#> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 27.5 63.3 34.2 -3.67 7448355.
#> 2 27.5 63.3 30.6 -2.5 7302061.
#> 3 -90.5 54.2 34.2 -3.67 12520695.
#> 4 -90.5 54.2 30.6 -2.5 12197621.
#> 5 34.7 -2.86 34.2 -3.67 104712.
#> 6 34.7 -2.86 30.6 -2.5 459812.
#> 7 -65.7 -13.8 34.2 -3.67 10987002.
#> 8 -65.7 -13.8 30.6 -2.5 10626917.
#> 9 4.53 48.6 34.2 -3.67 6466455.
#> 10 4.53 48.6 30.6 -2.5 6196688.
PS:考虑geospark包用于 spark 的地理空间工作。
推荐阅读
- sql - 转换要在 in 子句中使用的 postgres 表列中的逗号分隔值列表
- next.js - 如何在页面加载之前访问查询字符串?
- mutation - 置换编码的突变率
- openapi - 如何防止生成具有完整命名空间的模型?
- python - 查找数组的所有子集时,结果数组返回空子集,知道这里发生了什么吗?
- flutter - 如何在 Flutter 中使用 Pusher API 添加数据或更改数据,就像我们可以做的 websocket.org
- ios - iOS 15 Live Text - 检测关闭动作
- html - 如何在 html/css 中以垂直对齐方式居中元素?
- node.js - AWS Lambda 函数错误:找不到模块“lambda”
- swift - SwiftUI 编译器无法在合理的时间内对该表达式进行类型检查