apache-spark - sdf_bind_rows(a,b) %>% group_by(col1,col2) %>% sumarize(n=n()) 使 Spark 和 R 崩溃
问题描述
注意:这个 sparklyr github issue中发布的第一个问题。交叉发布,看看我在这里是否有更好的运气。
我闪闪发光,我正在尝试组合(rbind)两个数据集并从组合数据集中删除重复项。这产生了一系列错误,我可以在此示例中重现其中一个错误:
可重现的错误代码:
#install.packages("dplyr")
#install.packages("dbplyr")
#install.packages('sparklyr')
library(dplyr)
library(dbplyr)
library(sparklyr)
#spark_install(version = "2.1.0")
sc <- spark_connect(master = "local")
iris_tbl <- copy_to(sc, iris)
iris2 <- iris
iris_tbl2 <- copy_to(sc, iris2)
d <- sdf_bind_rows(iris_tbl,iris_tbl2)
d %>% group_by(Sepal.Length,Sepal.Width) %>% summarize(dups=n())
这给了我下面的错误。所有软件包都是最近从其 CRAN 版本安装的(请参阅下面长错误代码后的 sessionInfo() 输出。会发生什么?
在我的实际数据中,除了错误代码之外,该命令还会使我的 R 会话崩溃,重新启动 Rstudio。
错误信息
Error: org.apache.spark.sql.AnalysisException: cannot resolve '```Sepal.Length```' given input columns: [Sepal_Length, Petal_Length, Petal_Width, Species, Sepal_Width]; line 3 pos 9;
'Aggregate ['`Sepal.Length`, '`Sepal.Width`], ['`Sepal.Length`, '`Sepal.Width`, count(1) AS dups#389L]
+- SubqueryAlias sparklyr_tmp_186015321ac5
+- Union
:- Project [Sepal_Length#21, Sepal_Width#22, Petal_Length#23, Petal_Width#24, Species#25]
: +- SubqueryAlias sparklyr_tmp_18607e7315b6
: +- Project [Sepal_Length#21, Sepal_Width#22, Petal_Length#23, Petal_Width#24, Species#25]
: +- SubqueryAlias iris
: +- LogicalRDD [Sepal_Length#21, Sepal_Width#22, Petal_Length#23, Petal_Width#24, Species#25]
+- Project [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
+- Project [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
+- SubqueryAlias sparklyr_tmp_18607b4484c
+- Project [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
+- SubqueryAlias iris2
+- LogicalRDD [Sepal_Length#178, Sepal_Width#179, Petal_Length#180, Petal_Width#181, Species#182]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
会话信息
sessionInfo()
R version 3.4.2 (2017-09-28)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows Server >= 2012 x64 (build 9200)
Matrix products: default
locale:
[1] LC_COLLATE=Portuguese_Brazil.1252 LC_CTYPE=Portuguese_Brazil.1252
[3] LC_MONETARY=Portuguese_Brazil.1252 LC_NUMERIC=C
[5] LC_TIME=Portuguese_Brazil.1252
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] sparklyr_1.0.2 dbplyr_1.4.2 dplyr_0.8.3
loaded via a namespace (and not attached):
[1] Rcpp_1.0.2 rstudioapi_0.10 magrittr_1.5 tidyselect_0.2.5 R6_2.2.2
[6] rlang_0.4.0 httr_1.3.1 tools_3.4.2 parallel_3.4.2 config_0.3
[11] DBI_1.0.0 withr_2.1.1 ellipsis_0.2.0.1 htmltools_0.3.6 openssl_1.0
[16] yaml_2.1.16 assertthat_0.2.0 digest_0.6.15 rprojroot_1.3-2 tibble_2.1.3
[21] forge_0.2.0 crayon_1.3.4 purrr_0.3.2 base64enc_0.1-3 htmlwidgets_1.3
[26] glue_1.3.1 compiler_3.4.2 pillar_1.4.2 generics_0.0.2 backports_1.1.2
[31] r2d3_0.2.3 jsonlite_1.5 pkgconfig_2.0.1
解决方案
列名中的点被转换为_
(而不是.
),所以这应该可以代替:
d %>% group_by(Sepal_Length, Sepal_Width) %>% summarize(dups=n())
推荐阅读
- python - 正则表达式引号之间的所有内容,包括换行符
- spring-data - 使用规范通过 IGNITE 执行查询
- python - 在 python 中进行子类化以在客户端代码中解耦的正确方法
- sql-loader - SQL 加载程序:列 DR165_CREDIT_LIMIT_EXP_DT :不是有效的月份错误
- python - 使用变量导入和使用 django 模型类
- mysql - 如何在相等的桶中按距离排序?
- python - 我正在做一个 re.findall 一个字符串,我得到一个错误:错误:在位置 0 没有重复
- azerothcore - 尝试使用 DB 汇编器时出错:在用户表中找不到任何匹配的行
- c++ - “谓词不应因函数调用而修改其状态”是什么意思?
- python - AWS DataSync Lambda 自动化