python - 使用 Happybase 扫描远程 hbase 表时,发生“Tsocket read 0 bytes Error”
问题描述
我正在尝试扫描具有超过 1,000,000,000 行的远程 HBASE 表。扫描后,使用扫描的行,尝试使用 hdfs 制作 csv 文件。
我尝试了将近 3 周来解决它,但我不能。
/host/anaconda3/lib/python3.6/site-packages/thriftpy/transport/socket.py 的来源
/host/anaconda3/lib/python3.6/site-packages/thriftpy/transport/socket.py 的来源
==>我尝试过兼容协议,增加网络 tcp 内存缓冲区,增加超时配置,在扫描参数中设置 1 到 10000 批量大小等。
但它几乎可以运行 30 分钟,但突然发生错误。几乎 1/50 次它完成得很好。(运行良好,没有任何错误)请帮助我。我试图找到错误的原因。但我无法得到它。
有人知道如何解决吗?
这是我的代码
import sys
print ("--sys.version--")
print (sys.version)
from pyhive import hive
import csv
import os
import happybase
import time
import subprocess
import datetime
import chardet
import logging
logging.basicConfig(level=logging.DEBUG)
csv_list=[]
col=[]
def conn_base():
print('conn_base starts')
#SETTING CONNECTION AND CONFIGURATION
conn=happybase.Connection('13.xxx.xxx.xxx',port=9090)
table=conn.table(b'TEMP_TABLE')
#ITERATE DATA AND MAKE CSV FILE PER 100,000 RECORD. AND TAKE A TIME TO SLEEP PER 500000
tmp=[]
print('LET\'S MAKE CSV FILE FROM HBASE')
index=0
st=0
global csv_list
for row_key, data in table.scan():
try:
if (st%1000000==0):
time.sleep(30)
print("COUNT: ",st)
if (st%500000==0):
print("CHANGE CSV _FILE")
index+=1
ta_na='TEMP_TABLE'+str(index)+'_version.csv'
csv_list.append(ta_na)
st+=1
with open('/home/host01/csv_dir/TEMP_TABLE/'+csv_list[index-1] ,'a') as f:
tmp=[]
tmp.append(data[b'CF1:XXXXX'].decode())
tmp.append(data[b'CF1:YYYYY'].decode())
tmp.append(data[b'CF1:DDDDD'].decode())
tmp.append(data[b'CF1:SSSSS'].decode())
tmp.append(data[b'CF1:GGGGG'].decode())
tmp.append(data[b'CF1:HHHHH'].decode())
tmp.append(data[b'CF1:QQQQQ'].decode())
tmp.append(data[b'CF1:WWWWWW'].decode())
tmp.append(data[b'CF1:EEEEE'].decode())
tmp.append(data[b'CF1:RRRRR'].decode())
f.write(",".join(tmp)+'\n')
tmp=[]
except:
pass
#PUT CSV FILES TO HDFS.
st=1
for i in range(len(csv_list)):
try:
st+=1
cmd="hdfs dfs -put /home/host01/csv_dir/TEMP_TABLE"+str(csv_list[i])+" /user/hive/warehouse/TEMP_TABLE/"
subprocess.call(cmd,shell=True)
if (st%50==0):
time.sleep(5)
except:
pass
cmd="hdfs dfs -put /home/host01/csv_dir/TEMP_TABLE/*.csv /user/hive/warehouse/TEMP_TABLE/"
subprocess.call(cmd,shell=True)
print("PUT ALL CSV FILES TO HDFS")
conn.close()
解决方案
看看上面的代码,你让它变得更复杂了,这只是几个简单的步骤
- 确保 Hbase Thrift 已启动并正在运行。(使用上面建议的命令)
- 在 HDFS 设置文件中获取 webHdfs 启用。
- 从 hdfs 包使用不安全的客户端类(如果没有经过 kerberos 身份验证)直接将文件写入 HDFS(非常简单)
推荐阅读
- java - 防止广告在后台显示 (Android Studio)
- python - 单击按钮时调用脚本
- ios - 在 Ios 中,我需要接受三角形三边的长度作为输入,输出应指示三角形的类型
- java - 使用 Spark 在 Kafka 上发布消息
- javascript - 带有箭头函数的“this”应该代表定义箭头函数的对象?
- angular-material - 循环数据时的Angular Material Row
- powershell - 如何在没有 BOM 的情况下重定向 PowerShell 中的输入?
- php - PHP对象属性没有正确输出
- python - 对矩阵应用运算时结果不正确
- scala - Scala:如何对 Map[String, List[(String, Map[Long, Int])]] 中的 mapValues 求和