scala - 在 ResultScanner 对象上调用 next() 时出现 java.io.InterruptedIOException
问题描述
我正在尝试读取和解析 ResultScanner 扫描仪,但在调用时next()
出现异常。
这是我的代码的相关部分:
var scan: Scan = new Scan()
val keyRegEx : RegexStringComparator = new RegexStringComparator("^.*"+"123123123123")
val rowFilter : RowFilter = new RowFilter(CompareOp.EQUAL, keyRegEx)
scan.setFilter(rowFilter)
scan.setCaching(3000)
// Apply the scan to the Table
val scanner = table.getScanner(scan)
val scanOutput: Seq[(String, String)] = iterateScannerAddingRowkey[T](scanner, Seq())
def iterateScannerAddingRowkey[T](scanner: ResultScanner, acc: Seq[(String,String)])(implicit m: Manifest[T]) : Seq[(String,String)] = {
// **Line below is triggering the exception**
val result = scanner.next()
if (result == null) acc
else {
val rowKey = result.rawCells().head.toString.split("/")(0)
// Parsing the rawCells content into a JSONObject
val response : JSONObject = getJson[T](result.rawCells())
iterateScannerAddingRowkey[T](scanner, Seq((rowKey, response.toString)) ++ acc)
}
}
这是一个例外:
java.lang.RuntimeException: java.io.InterruptedIOException at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:97) at com.myproject.framework.hbase.HBaseUtils.iterateScannerAddingRowkey(HBaseUtils.scala: 85) 在 com.myproject.core.ComparePrefixVsRegex$.main(App.scala:46) 在 com.myproject.core.ComparePrefixVsRegex.main( App.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang .reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3。运行(ApplicationMaster.scala:686)原因:org.apache.hadoop.hbase.client.ScannerCallableWithReplicas 的 org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203) 的 java.io.InterruptedIOException。在 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 调用(ScannerCallableWithReplicas.java:61)在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401) 在 org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase .client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java: 200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401) 在 org.apache.hadoop .hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java: 200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401) 在 org.apache.hadoop .hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner. java:401) 在 org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) 在 org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) 在 org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner. java:401) 在 org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364) 在 org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)
据我了解,在这种情况下scanner
是空的next()
会返回null
有人知道我错过了什么吗?
解决方案
在尝试了几件事后,我能够解决问题。
主要原因是我正在查询的表的大小非常大,所以我在处理扫描仪时遇到了超时。为了解决这个问题,我做了两个改变:
我增加了火花广播超时
val spark = SparkSession
.builder
.config("spark.sql.broadcastTimeout", "36000")
.getOrCreate()
我在扫描中添加了我想阅读的列的选择,以减少结果的大小:
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C1"))
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C2"))
推荐阅读
- go - 通过 html/template 删除传递的参数周围的空格
- python - 将具有不同数量的特定分隔符实例的 CSV 文件读入 Pandas 数据框
- c# - PasswordSignInAsync 导致“无法跟踪实体类型'用户'的实例,因为另一个具有键值的实例”为什么?
- function - Julia:通过对一组函数求和来获得一个函数
- database - 在 Flask Web 服务器上显示 Python 数据库
- elasticsearch - 自动完成弹性搜索
- bigdata - 如何使用数组数据类型加入配置单元?
- angular - “forbiddenNameValidator(new RegExp(this.forbiddenName, 'i'))(control)”有什么作用?
- python - 如何使用链接节点的 Unicode 行打印树
- python - 并行运行硒/并行或按顺序刮取多个站点?