首页 > 解决方案 > sdf_bind_rows(a,b) %>% group_by(col1,col2) %>% sumarize(n=n()) 使 Spark 和 R 崩溃

问题描述

注意:这个 sparklyr github issue中发布的第一个问题。交叉发布,看看我在这里是否有更好的运气。

我闪闪发光,我正在尝试组合(rbind)两个数据集并从组合数据集中删除重复项。这产生了一系列错误,我可以在此示例中重现其中一个错误:

可重现的错误代码:

#install.packages("dplyr")
#install.packages("dbplyr")
#install.packages('sparklyr')

library(dplyr)
library(dbplyr)
library(sparklyr)
#spark_install(version = "2.1.0")

sc <- spark_connect(master = "local")

iris_tbl <- copy_to(sc, iris)
iris2 <- iris
iris_tbl2 <- copy_to(sc, iris2)
d <- sdf_bind_rows(iris_tbl,iris_tbl2) 
d %>% group_by(Sepal.Length,Sepal.Width) %>% summarize(dups=n())

这给了我下面的错误。所有软件包都是最近从其 CRAN 版本安装的(请参阅下面长错误代码后的 sessionInfo() 输出。会发生什么?

在我的实际数据中,除了错误代码之外,该命令还会使我的 R 会话崩溃,重新启动 Rstudio。

错误信息

Error: org.apache.spark.sql.AnalysisException: cannot resolve '```Sepal.Length```' given input columns: [Sepal_Length, Petal_Length, Petal_Width, Species, Sepal_Width]; line 3 pos 9;
'Aggregate ['`Sepal.Length`, '`Sepal.Width`], ['`Sepal.Length`, '`Sepal.Width`, count(1) AS dups#389L]
+- SubqueryAlias sparklyr_tmp_186015321ac5
+- Union
:- Project [Sepal_Length#21, Sepal_Width#22, Petal_Length#23, Petal_Width#24, Species#25]
:  +- SubqueryAlias sparklyr_tmp_18607e7315b6
:     +- Project [Sepal_Length#21, Sepal_Width#22, Petal_Length#23, Petal_Width#24, Species#25]
:        +- SubqueryAlias iris
:           +- LogicalRDD [Sepal_Length#21, Sepal_Width#22, Petal_Length#23, Petal_Width#24, Species#25]
+- Project [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
+- Project [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
+- SubqueryAlias sparklyr_tmp_18607b4484c
+- Project [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
+- SubqueryAlias iris2
+- LogicalRDD [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)

会话信息

sessionInfo()
R version 3.4.2 (2017-09-28)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows Server >= 2012 x64 (build 9200)

Matrix products: default

locale:
[1] LC_COLLATE=Portuguese_Brazil.1252  LC_CTYPE=Portuguese_Brazil.1252   
[3] LC_MONETARY=Portuguese_Brazil.1252 LC_NUMERIC=C                      
[5] LC_TIME=Portuguese_Brazil.1252    

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] sparklyr_1.0.2 dbplyr_1.4.2   dplyr_0.8.3   

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.2       rstudioapi_0.10  magrittr_1.5     tidyselect_0.2.5 R6_2.2.2        
 [6] rlang_0.4.0      httr_1.3.1       tools_3.4.2      parallel_3.4.2   config_0.3      
[11] DBI_1.0.0        withr_2.1.1      ellipsis_0.2.0.1 htmltools_0.3.6  openssl_1.0     
[16] yaml_2.1.16      assertthat_0.2.0 digest_0.6.15    rprojroot_1.3-2  tibble_2.1.3    
[21] forge_0.2.0      crayon_1.3.4     purrr_0.3.2      base64enc_0.1-3  htmlwidgets_1.3 
[26] glue_1.3.1       compiler_3.4.2   pillar_1.4.2     generics_0.0.2   backports_1.1.2 
[31] r2d3_0.2.3       jsonlite_1.5     pkgconfig_2.0.1 

标签: apache-sparkdplyrsparklyr

解决方案


列名中的点被转换为_(而不是.),所以这应该可以代替:

d %>% group_by(Sepal_Length, Sepal_Width) %>% summarize(dups=n())

推荐阅读