首页 > 解决方案 > 将 sparklyr 0.8.4 连接到远程 spark 2.2.1 连接

问题描述

我正在尝试从 R 连接到远程火花集群。spark集群是在debian jessie上构建的,我可以安装的R版本最多为3.3,但我需要3.4才能运行FactoMineR。所以我在另一台机器上安装了 R 并尝试使用 sparklyr 0.8.4 连接集群

> sc <- spark_connect(master = "spark://spark-cluster-m:7077", spark_home="/usr/lib/spark/", version="2.2.1")
Error in start_shell(master = master, spark_home = spark_home, spark_version = version,  : 
  SPARK_HOME directory '/usr/lib/spark/' not found

spark 没有安装在本地机器上,而是安装在 spark-cluster-m 上,它是:

jc@spark-cluster-m:/usr/lib/spark$ ls
bin  conf  data  examples  external  jars  LICENSE  licenses  NOTICE  python  R  README.md  RELEASE  sbin  work  yarn

我错过了什么吗?spark集群在google cloud(测试帐户)上,带有R的VM也是如此。如何验证端口spark可以连接到?

谢谢你的线索

@user16 ...你说得对,这个特殊问题似乎已经解决了,但我的方式并没有结束。我安装了相同的 spark 版本(2.2.1,hadoop > 2.7)

这是我的新错误消息:

    Error in force(code) : 
      Failed during initialize_connection: java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
            at scala.Predef$.require(Predef.scala:224)
            at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
            at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
            at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at sparklyr.Invoke.invoke(invoke.scala:137)
            at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
            at sparklyr.StreamHandler.read(stream.scala:66)
            at sparklyr.BackendHandler.channelRead0(handler.scala:51)
            at sparklyr.BackendHandler.channelRead0(handler.scala:4)
            at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
            at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
            at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
            at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
            at java.lang.Thread.run(Thread.java:748)

        Log: /tmp/RtmpTUh0z6/file5d231368db0_spark.log


    ---- Output Log ----
            at io.netty.channel.nio.NioEventLoop.processS
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
            ... 1 more
    18/07/21 18:24:59 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-cluster-m:7077...
    18/07/21 18:24:59 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master spark-cluster-m:7077
    org.apache.spark.SparkException: Exception thrown in awaitResult: 
            at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
            at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
            at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
            at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
            at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: Failed to connect to spark-cluster-m/10.142.0.3:7077
            at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
            at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
            at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
            at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
            at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
            ... 4 more
    Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spark-cluster-m/10.142.0.3:7077
            at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
            at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
            at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
            at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
            ... 1 more
    18/07/21 18:25:19 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
    18/07/21 18:25:19 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
    18/07/21 18:25:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 46811.
    18/07/21 18:25:19 INFO NettyBlockTransferService: Server created on 10.142.0.5:46811
    18/07/21 18:25:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    18/07/21 18:25:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.142.0.5, 46811, None)
    18/07/21 18:25:19 INFO BlockManagerMasterEndpoint: Registering block manager 10.142.0.5:46811 with 366.3 MB RAM, BlockManagerId(driver, 10.142.0.5, 46811, None)
    18/07/21 18:25:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.142.0.5, 46811, None)
    18/07/21 18:25:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.142.0.5, 46811, None)
    18/07/21 18:25:19 INFO SparkUI: Stopped Spark web UI at http://10.142.0.5:4040
    18/07/21 18:25:19 INFO StandaloneSchedulerBackend: Shutting down all executors
    18/07/21 18:25:19 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
    18/07/21 18:25:19 WARN StandaloneAppClient$ClientEndpoint: Drop Unregist

我可以看到它可以解析名称(=> 10.142.0.3)另外,它似乎是一个好端口,好像我使用端口 7000,我有错误:

18/07/21 18:32:54 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from spark-cluster-m/10.142.0.3:7000 is closed
18/07/21 18:32:54 WARN StandaloneAppClient$ClientEndpoint: Could not connect to spark-cluster-m:7000: java.io.IOException: Connection reset by peer
18/07/21 18:32:54 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master spark-cluster-m:7000

但我无法弄清楚这意味着什么。

你说我的配置是“特殊的”。如果有更好(和简单)的方法,我会很高兴使用它。

以下是我进行测试的方式: 我用 spark (2.2.1) 创建了一个 google dataproc 集群 我在每个节点上添加了 Cassandra

在这个阶段,一切正常。

然后,我需要安装 FactoMineR,因为我想尝试 HMFA。据说它可以在 R > 3.0.0 下运行,所以它似乎没问题,但它取决于不能安装在 R < 3.4.0 上的 nlme(并且 debian jessie backports 中的那个是 3.3。)

那么,我能做些什么呢?我必须承认,我对从头开始重新启动完整的 spark / cassandra 集群安装不是很热情......

标签: rapache-sparkgoogle-cloud-dataprocsparklyr

解决方案


推荐阅读