首页 > 解决方案 > 在 ResultScanner 对象上调用 next() 时出现 java.io.InterruptedIOException

问题描述

我正在尝试读取和解析 ResultScanner 扫描仪,但在调用时next()出现异常。

这是我的代码的相关部分:

    var scan: Scan = new Scan()
    val keyRegEx : RegexStringComparator = new RegexStringComparator("^.*"+"123123123123")
    val rowFilter : RowFilter = new RowFilter(CompareOp.EQUAL, keyRegEx)

    scan.setFilter(rowFilter)
    scan.setCaching(3000)

    // Apply the scan to the Table
    val scanner = table.getScanner(scan)

    val scanOutput: Seq[(String, String)] = iterateScannerAddingRowkey[T](scanner, Seq())

  def iterateScannerAddingRowkey[T](scanner: ResultScanner, acc: Seq[(String,String)])(implicit m: Manifest[T]) : Seq[(String,String)] = {
    // **Line below is triggering the exception**
    val result = scanner.next()
    if (result == null) acc
    else {
      val rowKey = result.rawCells().head.toString.split("/")(0)
      // Parsing the rawCells content into a JSONObject
      val response : JSONObject = getJson[T](result.rawCells())
      iterateScannerAddingRowkey[T](scanner, Seq((rowKey, response.toString)) ++ acc)
    }
  }

这是一个例外:

java.lang.RuntimeException: java.io.InterruptedIOException at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:97) at com.myproject.framework.hbase.HBaseUtils.iterateScannerAddingRowkey(HBaseUtils.scala: 85) 在 com.myproject.core.ComparePrefixVsRegex$.main(App.scala:46) 在 com.myproject.core.ComparePrefixVsRegex.main( App.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang .reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3。运行(ApplicationMaster.scala:686)原因:org.apache.hadoop.hbase.client.ScannerCallableWithReplicas 的 org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203) 的 java.io.InterruptedIOException。在 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 调用(ScannerCallableWithReplicas.java:61)在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401) 在 org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase .client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java: 200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401) 在 org.apache.hadoop .hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java: 200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401) 在 org.apache.hadoop .hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner. java:401) 在 org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner. java:401) 在 org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)

据我了解,在这种情况下scanner是空的next()会返回null

有人知道我错过了什么吗?

标签: scalahbase

解决方案


在尝试了几件事后,我能够解决问题。

主要原因是我正在查询的表的大小非常大,所以我在处理扫描仪时遇到了超时。为了解决这个问题,我做了两个改变:

我增加了火花广播超时

val spark = SparkSession
              .builder
              .config("spark.sql.broadcastTimeout", "36000")
              .getOrCreate()

我在扫描中添加了我想阅读的列的选择,以减少结果的大小:

scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C1"))
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C2"))

推荐阅读