r - 在 sparklyr 中汇总标准偏差并计算非 NA
问题描述
我有一个很大的 data.frame,并且我一直在使用summarise
结合across
. 由于我的 data.frame 的大小,我不得不开始处理我的数据sparklyr
。
由于sparklyr
不支持across
我正在使用summarise_each
. 这工作正常,除了summarise_each
insparklyr
似乎不支持sd
和sum(!is.na(.))
下面是一个示例数据集以及我通常如何处理它,使用dplyr
:
test <- data.frame(ID = c("Group1","Group1",'Group1','Group1','Group1','Group1','Group1',
"Group2","Group2","Group2",'Group2','Group2','Group2',"Group2",
"Group3","Group3","Group3"),
Value1 = c(-100,-10,-5,-5,-5,1,2,1,2,3,4,4,4,4,1,2,3),
Value2 = c(50,100,10,-5,3,1,2,2,2,3,4,4,4,4,1,2,3))
test %>%
group_by %>%
summarise(across((Value1:Value2), ~sum(!is.na(.), na.rm = TRUE), .names = "{col}_count"),
across((Value1:Value2), ~min(., na.rm = TRUE), .names = "{col}_min"),
across((Value1:Value2), ~max(., na.rm = TRUE), .names = "{col}_max"),
across((Value1:Value2), ~mean(., na.rm = TRUE), .names = "{col}_mean"),
across((Value1:Value2), ~sd(., na.rm = TRUE), .names = "{col}_sd"))
# A tibble: 1 x 10
Value1_count Value2_count Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean Value1_sd Value2_sd
<int> <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 17 17 -100 -5 4 100 -5.53 11.2 24.7 25.8
我还能够使用 summarise_each 成功获得相同的答案,如下所示:
test %>%
group_by(ID) %>%
summarise_each(funs(min = min(., na.rm = TRUE),
max = max(., na.rm = TRUE),
mean = mean(., na.rm = TRUE),
sum = sum(., na.rm = TRUE),
sd = sd(., na.rm = TRUE)))
ID Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean Value1_sum Value2_sum
<fct> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 Group1 -100 -5 2 100 -17.4 23 -122 161
2 Group2 1 2 4 4 3.14 3.29 22 23
3 Group3 1 1 3 3 2 2 6 6
使用时sparklyr
我已经成功计算出min
, max
, mean
,sum
如下图所示:
sc <- spark_connect(master = "local", version = "2.4.3")
test <- spark_read_csv(sc = sc, path = "C:\\path\\test space.csv")
test %>%
group_by(ID) %>%
summarise_each(funs(min = min(., na.rm = TRUE),
max = max(., na.rm = TRUE),
mean = mean(., na.rm = TRUE),
sum = sum(., na.rm = TRUE)))
# Source: spark<?> [?? x 9]
ID Value1_min Value_2_min Value1_max Value_2_max Value1_mean Value_2_mean Value1_sum Value_2_sum
<chr> <int> <int> <int> <int> <dbl> <dbl> <dbl> <dbl>
1 Group2 1 2 4 4 3.14 3.29 22 23
2 Group3 1 1 3 3 2 2 6 6
3 Group1 -100 -5 2 100 -17.4 23 -122 161
但是我在尝试获取时收到错误消息sd
,sum(!is.na(.))
下面是我收到的代码和错误消息。是否有任何解决方法可以帮助汇总这些值?
test %>%
group_by(ID) %>%
summarise_each(funs(min = min(., na.rm = TRUE),
max = max(., na.rm = TRUE),
mean = mean(., na.rm = TRUE),
sum = sum(., na.rm = TRUE),
sd = sd(., na.rm = TRUE)))
Error: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'AS' expecting ')'(line 1, pos 298)
== SQL ==
SELECT `ID`, MIN(`Value1`) AS `Value1_min`, MIN(`Value_2`) AS `Value_2_min`, MAX(`Value1`) AS `Value1_max`, MAX(`Value_2`) AS `Value_2_max`, AVG(`Value1`) AS `Value1_mean`, AVG(`Value_2`) AS `Value_2_mean`, SUM(`Value1`) AS `Value1_sum`, SUM(`Value_2`) AS `Value_2_sum`, stddev_samp(`Value1`, TRUE AS `na.rm`) AS `Value1_sd`, stddev_samp(`Value_2`, TRUE AS `na.rm`) AS `Value_2_sd`
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^
FROM `test_space_30172a44_c0aa_4305_9a5e_d45fa77ba0b9`
GROUP BY `ID`
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at sun.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
at sparklyr.StreamHandler.read(stream.scala:61)
at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
at scala.util.control.Breaks.breakable(Breaks.scala:38)
at sparklyr.BackendHandler.channelRead0(handler.scala:38)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
In addition: Warning messages:
1: Named arguments ignored for SQL stddev_samp
2: Named arguments ignored for SQL stddev_samp
解决方案
问题是na.rm
参数。Spark 的stddev_samp
函数没有这样的参数,sparklyr
似乎也没有处理它。
SQL 中始终会删除缺失值,因此您无需指定na.rm
.
test_spark %>%
group_by(ID) %>%
summarise_each(funs(min = min(.),
max = max(.),
mean = mean(.),
sum = sum(.),
sd = sd(.)))
#> # Source: spark<?> [?? x 11]
#> ID Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean
#> <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 Group2 1 2 4 4 3.14 3.29
#> 2 Group1 -100 -5 2 100 -17.4 23
#> 3 Group3 1 1 3 3 2 2
#> Value1_sum Value2_sum Value1_sd Value2_sd
#> <dbl> <dbl> <dbl> <dbl>
#> 1 22 23 1.21 0.951
#> 2 -122 161 36.6 38.6
#> 3 6 6 1 1
这看起来像是一个特定于summarise
as sd
with的错误,na.rm
适用于mutate
.
test_spark %>%
group_by(ID) %>%
mutate_each(funs(sd = sd(., na.rm = TRUE)))
因为sum(!is.na(.))
,你只需要把它写成sum(ifelse(is.na(.), 0, 1))
.
推荐阅读
- java - 启动springboot服务器时如何绕过kafka代理失败?
- javascript - 嵌套后返回为 NaN 的值
- javascript - 将内容类型更改为 Json (XMLHttpRequest) 的问题
- python - 使用 Selenium / Python 自动执行两步验证?
- ios - 输入附件 ViewController 演示
- c++ - c++中的用户定义数据类型
- r - 如何重试 R testthat 测试 API 错误?
- android - Observable fromIterable 不会将项目插入到 Room
- sql - 基于Oracle中的性能、效率和优势的日期vs时间戳vs间隔(秒到天等)
- java - 为什么 MainActivity.this 返回 null?