python - 如何在 Python 和 Pandas(Data Frame) 中将条件 SQL 查询中的数据插入 Hbase?
问题描述
假设我有一些示例数据table_name_a
如下:
code val_a val_b remark date
------------------------------------------
1 00001 500 0.1 111 20191108
2 00001 1000 0.2 222 20191109
3 00002 200 0.1 111 20191110
4 00002 400 0.3 222 20191111
5 00001 200 0.2 333 20191112
6 00001 400 0.1 444 20191113
如果我在 Hbase 上查询如下(值来自select code, val_a from table_name_a where remark = 111
):
get 'test_01', '00001-20191108','n:111_a'
我的预期输出如下:
code 111_a
---------------
1 00001 500
我只知道如何在 Python(pandas) 中导入所有这样的数据
from db_conn import impala, hbasecon
import numpy as np
import pandas as pd
def main():
conn_impa = impala().getcon()
sql = """ SELECT * FROM table_name_a """
df = pd.read_sql(sql=sql, con=conn_impa)
df = df.fillna("")
num = len(df)
if num > 0:
hfdtable = hbasecon(FEDMTABLE).gettable()
with hfdtable.batch(batch_size=1000) as b:
df.apply(lambda row: b.put(row["code"], {'table_name_a:code': str(row["code"])}), axis=1)
df.apply(lambda row: b.put(row["val_a"], {'table_name_a:val_a': str(row["val_a"])}), axis=1)
df.apply(lambda row: b.put(row["val_b"], {'table_name_a:val_b': str(row["val_b"])}), axis=1)
df.apply(lambda row: b.put(row["date"], {'table_name_a:date': str(row["date"])}), axis=1)
df.apply(lambda row: b.put(row["remark"], {'table_name_a:remark': str(row["remark"])}), axis=1)
return True
if __name__ == '__main__':
main()
但我不知道如何使用 Python 将数据导入 Hbase,因为 Hbase 需要使用111_a
' 111'val_a
remark
非常感谢您的任何建议。
解决方案
好吧,这是我的解决方案,希望对其他人有所帮助。我爱 Python,我爱 SQL,我也爱我自己。终于可以自己解决了,哈哈哈。☕️
from db_conn import impala, hbasecon
import numpy as np
import pandas as pd
def main():
conn_impa = impala().getcon()
sql = """
SELECT code, date,
CASE WHEN t.remark='111' THEN t.val_a ELSE 0 END 111_a,
CASE WHEN t.remark='111' THEN t.val_b ELSE 0 END 111_b,
CASE WHEN t.remark='222' THEN t.val_a ELSE 0 END 222_a,
CASE WHEN t.remark='222' THEN t.val_b ELSE 0 END 222_b,
CASE WHEN t.remark='333' THEN t.val_a ELSE 0 END 333_a,
CASE WHEN t.remark='333' THEN t.val_b ELSE 0 END 333_b,
CASE WHEN t.remark='444' THEN t.val_a ELSE 0 END 444_a,
CASE WHEN t.remark='444' THEN t.val_b ELSE 0 END 444_b,
FROM table_name_a t
"""
df = pd.read_sql(sql=sql, con=conn_impa)
df = df.fillna("")
num = len(df)
if num > 0:
hfdtable = hbasecon(FEDMTABLE).gettable()
with hfdtable.batch(batch_size=1000) as b:
df.apply(lambda row: b.put(row["code"], {'table_name_a:code': str(row["code"])}), axis=1)
df.apply(lambda row: b.put(row["111_a"], {'table_name_a:111_a': str(row["111_a"])}), axis=1)
df.apply(lambda row: b.put(row["111_b"], {'table_name_a:111_b': str(row["111_b"])}), axis=1)
df.apply(lambda row: b.put(row["222_a"], {'table_name_a:222_a': str(row["222_a"])}), axis=1)
df.apply(lambda row: b.put(row["222_b"], {'table_name_a:222_b': str(row["222_b"])}), axis=1)
df.apply(lambda row: b.put(row["333_a"], {'table_name_a:333_a': str(row["333_a"])}), axis=1)
df.apply(lambda row: b.put(row["333_b"], {'table_name_a:333_b': str(row["333_b"])}), axis=1)
df.apply(lambda row: b.put(row["444_a"], {'table_name_a:444_a': str(row["444_a"])}), axis=1)
df.apply(lambda row: b.put(row["444_b"], {'table_name_a:444_b': str(row["444_b"])}), axis=1)
return True
if __name__ == '__main__':
main()
推荐阅读
- cython - 在 Cython 中声明一个可变长度的元组
- python - 更改 Seaborn 配对图的字体大小
- python-3.x - 获取给定图像位置的 numpy 索引
- r - R Shiny dateInput在NA默认和最小允许日期时不遵守最大允许日期
- ios - 如何通过在同一行中加载单元格来修复 tableview?
- python - 中序遍历中带有堆栈的while语句
- python - 如何将numpy数组转换为向量
& (参考)与 SWIG - javascript - 为什么将 JSONObject 放入 indexedDB 数据库时会出错?
- swift - UISearchController 中的取消按钮在 IOS 13 中没有正确消失
- kubernetes - kubernetes Autoscaler - 无法获取 loadbalancing.googleapis.com|https|request_count