首页 > 解决方案 > Spark 作业在尝试从 ignite 集群中获取数据时无限期卡住

问题描述

private static final ThreadLocal<IgniteClient> igniteClientContext = new ThreadLocal<>();


public static IgniteClient getIgniteClient(String[] address) {
    if(igniteClientContext.get() == null) {
        ClientConfiguration clientConfig = null;
        if(cfg == null) {
            clientConfig = new ClientConfiguration().setAddresses(address);
        } else {
            clientConfig = cfg;
        }
        IgniteClient igniteClient = Ignition.startClient(clientConfig);
        logger.info("igniteClient initialized ");
        igniteClientContext.set(igniteClient);
    }
    return igniteClientContext.get();
}

从火花代码,我正在尝试创建点燃瘦客户端的实例并创建缓存对象。

val address = config.igniteServers.split(",") // config.igniteServers ="10.xx.xxx.xxx:10800,10.xx.xx.xxx:10800"

下面的代码将从 spark executor 中调用。我们将在每个执行程序中处理集合或记录,我们只是从缓存中读取数据并与当前处理的记录进行比较。如果它已经存在于缓存中,我们将忽略,否则我们将使用它。

val cacheCfg = new ClientCacheConfiguration()
      .setName(PNR_CACHE)
      .setCacheMode(CacheMode.REPLICATED)
      .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
      .setDefaultLockTimeout(30000)
val igniteClient = IgniteHelper.getIgniteClient(address)
val cache : ClientCache[Long, Boolean] = igniteClient.getOrCreateCache(cacheCfg);
    

在作业结束时,我们将使用所有有效记录更新缓存。这已经运行了几次,并且在某些时候,它在尝试从缓存中读取数据时会无限期地卡住。

在 Executor 日志中,我可以看到 IgniteClusterUnavailable 异常。

org.apache.ignite.client.ClientConnectionException: Ignite cluster is unavailable [sock=Socket[addr=hdpct2ldap01g02.hadoop.sgdcprod.XXXX.com/10.xx.xx.xx,port=10800,localport=20214]]
at org.apache.ignite.internal.client.thin.TcpClientChannel.handleIOError(TcpClientChannel.java:499)
at org.apache.ignite.internal.client.thin.TcpClientChannel.handleIOError(TcpClientChannel.java:491)
at org.apache.ignite.internal.client.thin.TcpClientChannel.access$100(TcpClientChannel.java:92)
at org.apache.ignite.internal.client.thin.TcpClientChannel$ByteCountingDataInput.read(TcpClientChannel.java:538)
at org.apache.ignite.internal.client.thin.TcpClientChannel$ByteCountingDataInput.readInt(TcpClientChannel.java:572)
at org.apache.ignite.internal.client.thin.TcpClientChannel.processNextResponse(TcpClientChannel.java:272)
at org.apache.ignite.internal.client.thin.TcpClientChannel.receive(TcpClientChannel.java:234)
at org.apache.ignite.internal.client.thin.TcpClientChannel.service(TcpClientChannel.java:171)
at org.apache.ignite.internal.client.thin.ReliableChannel.service(ReliableChannel.java:160)
at org.apache.ignite.internal.client.thin.ReliableChannel.request(ReliableChannel.java:187)
at org.apache.ignite.internal.client.thin.TcpIgniteClient.getOrCreateCache(TcpIgniteClient.java:124)
at com.XXXX.eda.pnr.PnrApplication$$anonfun$2$$anonfun$apply$4.apply(PnrApplication.scala:305)
at com.XXXX.eda.pnr.PnrApplication$$anonfun$2$$anonfun$apply$4.apply(PnrApplication.scala:297)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection timed out (Read failed)
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.apache.ignite.internal.client.thin.TcpClientChannel$ByteCountingDataInput.read(TcpClientChannel.java:535)
    ... 24 more
20/06/21 05:49:42 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1949
20/06/21 05:49:42 INFO executor.Executor: Running task 103.1 in stage 25.0 (TID 1949)

Threaddump 也包含以下异常。

20/06/21 05:51:57 WARN hdfs.BlockReaderFactory: I/O error constructing remote block reader.
java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.93.133.157:20952 remote=host.hadoop.sgdcprod.XXXX.com/10.93.133.136:1004]
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2354)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.readSaslMessage(DataTransferSaslUtil.java:212)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:451)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:299)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:242)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:211)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.peerSend(SaslDataTransferClient.java:160)
    at org.apache.hadoop.hdfs.net.TcpPeerServer.peerFromSocketAndKey(TcpPeerServer.java:92)
    at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3593)
    at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:849)
    at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:764)
    at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:377)
    at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:666)
    at org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1663)
    at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:877)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:913)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:981)
    at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:197)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.avro.mapred.FsInput.read(FsInput.java:54)
    at org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
    at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:824)
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349)
    at org.apache.avro.io.BinaryDecoder.readFixed(BinaryDecoder.java:302)
    at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
    at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:198)
    at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1$$anon$1.hasNext(DefaultSource.scala:215)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/06/21 05:51:57 WARN hdfs.DFSClient: Failed to connect to host.hadoop.sgdcprod.XXXX.com/10.93.133.136:1004 for block BP-1009813635-10.93.133.107-1555169940973:blk_1182405155_108738113, add to deadNodes and continue. 
java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.93.133.157:20952 remote=host.hadoop.sgdcprod.XXXX.com/10.93.133.136:1004]
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2354)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.readSaslMessage(DataTransferSaslUtil.java:212)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:451)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:299)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:242)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:211)
    at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.peerSend(SaslDataTransferClient.java:160)
    at org.apache.hadoop.hdfs.net.TcpPeerServer.peerFromSocketAndKey(TcpPeerServer.java:92)
    at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3593)
    at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:849)
    at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:764)
    at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:377)
    at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:666)
    at org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1663)
    at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:877)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:913)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:981)
    at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:197)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.avro.mapred.FsInput.read(FsInput.java:54)
    at org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
    at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:824)
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349)
    at org.apache.avro.io.BinaryDecoder.readFixed(BinaryDecoder.java:302)
    at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
    at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:198)
    at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1$$anon$1.hasNext(DefaultSource.scala:215)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我们在 spark.properties 文件中设置 defaultReadTimeout。但它没有正确超时。spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseG1GC -Dsun.net.client.defaultReadTimeout:300000 -Dsun .net.client.defaultConnectTimeout=300000 -DIGNITE_REST_START_ON_CLIENT=true spark .driver.exetraJavaOptions=-Dsun.net.client.defaultReadTimeout:300000 -Dsun.net.client.defaultConnectTimeout=300000 -DIGNITE_REST_START_ON_CLIENT=true

请帮助解决问题。

点燃版本使用:2.8.0 & 2.8.1

标签: igniteapache-spark-2.0

解决方案


Spark 集群中的 HDFS 和 Ignite 集群存在一些连接问题:

HDFS:

WARN hdfs.DFSClient: Failed to connect to host.hadoop.sgdcprod.XXXX.com/10.93.133.136:1004 for block BP-1009813635-10.93.133.107-1555169940973:blk_1182405155_108738113, add to deadNodes and continue.

对于点燃:

Caused by: java.net.SocketException: Connection timed out (Read failed)
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)

考虑到您的 spark 工作无法连接它们,那么我猜您那里的连接存在问题。

但是,请检查以下 HDFS:

1)由于端口关闭或某些连接问题,您的某些 Spark 工作人员无法连接 host.hadoop.sgdcprod.XXXX.com/10.93.133.136:1004 地址。您可以尝试通过 netstat 等工具检查此地址。

2)您将使用不正确的 HDFS 端口。请检查您的 Spark 作业代码中使用的 namenode 的 HDFS URL 是否正确。

3)您的 HDFS 工作人员配置有问题。可能其中一些无法连接到名称节点。

对于点燃:

1)检查您的集群是否处于活动状态并且没有挂起。我希望您的集群是在外部某个地方启动的,并且可以通过一些监控工具获得。

2)检查来自 IP finder 的服务器地址是否可以从每个 Spark 工作人员处解析。


推荐阅读