首页 > 技术文章 > 血缘分析

mxxct 2020-10-22 11:10 原文

date: 2020-07-29 09:54:00
updated: 2020-08-04 17:09:00

血缘分析

1. LineageLogger

首先需要对 org.apache.hadoop.hive.ql.hooks.LineageLogger 这个类进行改写,原类会把 lineage 字段依赖信息打印到日志里去,但是现在我们需要把 lineage 信息直接 return 回来。

String lineage = out.toString();
if (testMode) {
    log(lineage);
} else {
    LOG.info(lineage);
}

=> 
String lineage = out.toString();
return lineage;

2. 添加 hook

Hive 提供了多个 hook 给开发者调用,对于字段分析来说,需要在 conf 里添加 hConf.set("hive.exec.post.hooks", "org.apache.hadoop.hive.ql.hooks.LineageLogger")

3. LineageInfo

package cn.edata.StageTest;

import cn.edata.Lineage.LineageLogger;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx;
import org.apache.hadoop.hive.ql.parse.*;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.CommonDataSource;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * 改写hive本身编译过程,获取字段依赖关系
 */
public class LineageInfo {
    private static final String KEY_ADD = "add ";
    private static final String KEY_DROP = "drop ";
    private static final String KEY_SET = "set ";
    private static final String KEY_FUNCTION = "create temporary function ";
    private static final SetProcessor setProcessor = new SetProcessor();
    static Logger LOG = LoggerFactory.getLogger("LineageInfo");

    private static final LineageLogger lineageLogger = new LineageLogger();

    public void getLineageInfo(HiveConf conf, String filePath, boolean testTableDenpendency) throws LockException, IOException, ParseException, SemanticException {
        SessionState ss = SessionState.start(conf);
        ss.initTxnMgr(conf);

        System.out.println("filePath: " + filePath);
        // TODO: 2020/8/3 后续添加对直接读取文件的支持 
        String command2 = "select * from model_ennenergy_ccs.a_md_ccs_common_h limit 2";
        List<String> commandList = new LinkedList<>();
        commandList.add("use model_ennenergy_ccs");
        commandList.add("select * from a_md_ccs_common_h limit 2");
        
        for(String command : commandList){
            String lowerSql = command.toLowerCase();
            // add / drop 无需解析
            if (lowerSql.startsWith(KEY_ADD) || lowerSql.startsWith(KEY_DROP)) {
                continue;
            }
            // 设置参数
            if (lowerSql.startsWith(KEY_SET)) {
                setProcessor.run(command.substring(KEY_SET.length()));
                continue;
            }
            
            command = new VariableSubstitution(new HiveVariableSource() {
                @Override
                public Map<String, String> getHiveVariable() {
                    return ss.getHiveVariables();
                }
            }).substitute(conf, command);
            Context ctx = new Context(conf);
            ctx.setCmd(command);
            System.out.println("ctx: " + ctx);
            ASTNode tree;
            try {
                ParseDriver pd = new ParseDriver();
                tree = pd.parse(command, ctx);
                tree = ParseUtils.findRootNonNullToken(tree);
            } catch (ParseException e) {
                throw e;
            }
            System.out.println("tree: " + tree);

            // 切换数据库
            if (tree.getToken().getType() == HiveParser.TOK_SWITCHDATABASE) {
                ss.setCurrentDatabase(tree.getChild(0).getText());
                continue;
            }

            ss.setupQueryCurrentTimestamp();
            System.out.println("ss: " + ss);
            // 6.2.0 版本
            // QueryState queryState = new QueryState(conf);
            // System.out.println("queryState: " + queryState);
            // BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
            BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
            sem.analyze(tree, ctx);
            sem.validate();
            System.out.println("sem: " + sem);

            Schema schema = getSchame(sem, conf);
            // System.out.println("schema: " + schema);
            
            // 查询计划
            // QueryDisplay queryDisplay = new QueryDisplay();
            // queryDisplay.setQueryStr(command);
            // queryDisplay.setQueryId(QueryPlan.makeQueryId());
            // QueryPlan queryPlan = new QueryPlan(command, sem, 0L, QueryPlan.makeQueryId(), SessionState.get().getHiveOperation(), schema, queryDisplay);
            // System.out.println("queryPlan: " + queryPlan);
            
            List<FieldSchema> fieldSchemas = schema.getFieldSchemas();
            // System.out.println("fieldSchemas: " + fieldSchemas);

            // 部分语句不能完整的分析出schema
            // 例:ALTER TABLE model_icome_cheme.cheme_icome_kpi_month_h SET
            // TBLPROPERTIES('comment' = '化工月指标')
            // 可以针对 alter 开头的语句进行过滤,无需解析
            if (fieldSchemas == null) {
                continue;
            }
            
            HashSet<WriteEntity> outputs = sem.getOutputs();
            System.out.println("outputs: " + outputs);

            // 字段血缘分析信息输出
            LineageCtx.Index index = ss.getLineageState().getIndex();
            // 6.2.0 版本
            // LineageCtx.Index index = queryState.getLineageState().getIndex();
            System.out.println("index: " + index);

            String result = lineageLogger.getJsonString(command, fieldSchemas, outputs, index);
            System.out.println("result: " + result);

            if(testTableDenpendency){
                DependencyInfo dependencyInfo = new DependencyInfo();
                dependencyInfo.getDependencyInfo(result);
            }
        }

    }

    // 拿过来Driver类下的方法
    private Schema getSchame(BaseSemanticAnalyzer sem, HiveConf conf) {
        Schema schema = null;
        if (sem != null) {
            if (sem.getResultSchema() != null) {
                List<FieldSchema> lst = sem.getResultSchema();
                schema = new Schema(lst, (Map) null);
            } else if (sem.getFetchTask() != null) {
                FetchTask ft = sem.getFetchTask();
                TableDesc td = ft.getTblDesc();
                if (td == null && ft.getWork() != null && ((FetchWork) ft.getWork()).getPartDesc() != null && ((FetchWork) ft.getWork()).getPartDesc().size() > 0) {
                    td = ((PartitionDesc) ((FetchWork) ft.getWork()).getPartDesc().get(0)).getTableDesc();
                }

                if (td != null) {
                    String tableName = "result";
                    List lst = null;

                    try {
                        lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf));
                    } catch (Exception e) {
                        System.out.println("Error getting schema: " + StringUtils.stringifyException(e));
                    }

                    if (lst != null) {
                        schema = new Schema(lst, (Map) null);
                    }
                }
            }
        }
        if (schema == null) {
            schema = new Schema();
        }
        // System.out.println("Returning Hive schema: " + schema);
        return schema;
    }
}

4. TableDependency

除了字段的血缘分析,还可以进一步去获取表之间的依赖关系

package cn.edata.StageTest;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 拿到返回的字段依赖信息,解析数据,获取表之间的依赖关系
 */
public class DependencyInfo {
    public void getDependencyInfo(String lineageInfo){
        JSONObject result = JSONObject.parseObject(lineageInfo);
        System.out.println("#########");

        JSONArray verticesArray = JSONArray.parseArray(result.getString("vertices"));

        HashSet<String> modelTables = new HashSet<>();
        HashSet<String> originTables = new HashSet<>();

        verticesArray.forEach(data->{
            JSONObject tmp = JSONObject.parseObject(data.toString());
            String vertextId = tmp.getString("vertexId");
            if(vertextId.startsWith("model"))
                modelTables.add(vertextId.split("\\.")[0]+"."+vertextId.split("\\.")[1]);
            if(vertextId.startsWith("origin"))
                originTables.add(vertextId);
        });

        System.out.println("####");
        System.out.println("modelTables: " + modelTables.toString());
        System.out.println("originTables: " + originTables.toString());


    }

}

5. pom.xml

<properties>
    <!--        查询-->
    <!--        <cdh.hadoop.version>3.0.0-cdh6.2.0</cdh.hadoop.version>-->
    <!--        <cdh.hive.version>2.1.1-cdh6.2.0</cdh.hive.version>-->

    <cdh.hadoop.version>2.6.0-cdh5.14.4</cdh.hadoop.version>
    <cdh.hive.version>1.1.0-cdh5.14.4</cdh.hive.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.4</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${cdh.hadoop.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-common</artifactId>
        <version>${cdh.hadoop.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${cdh.hive.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>
                    hadoop-yarn-server-resourcemanager
                </artifactId>
                <groupId>org.apache.hadoop</groupId>
            </exclusion>
<!--                <exclusion>-->
<!--                    <artifactId>gson</artifactId>-->
<!--                    <groupId>com.google.code.gson</groupId>-->
<!--                </exclusion>-->
            <exclusion>
                <artifactId>hive-shims</artifactId>
                <groupId>org.apache.hive</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>${cdh.hive.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>hadoop-common</artifactId>
                <groupId>org.apache.hadoop</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-server</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jasper-compiler</artifactId>
                <groupId>tomcat</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jasper-runtime</artifactId>
                <groupId>tomcat</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hadoop-hdfs</artifactId>
                <groupId>org.apache.hadoop</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jetty-rewrite</artifactId>
                <groupId>org.eclipse.jetty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jetty-server</artifactId>
                <groupId>org.eclipse.jetty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jetty-runner</artifactId>
                <groupId>org.eclipse.jetty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jetty-servlet</artifactId>
                <groupId>org.eclipse.jetty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jetty-webapp</artifactId>
                <groupId>org.eclipse.jetty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hadoop-yarn-registry</artifactId>
                <groupId>org.apache.hadoop</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jetty</artifactId>
                <groupId>org.mortbay.jetty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-hadoop2-compat</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>gson</artifactId>
                <groupId>com.google.code.gson</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hive-shims-0.23</artifactId>
                <groupId>org.apache.hive.shims</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hive-common</artifactId>
                <groupId>org.apache.hive</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${cdh.hadoop.version}</version>
    </dependency>

</dependencies>

推荐阅读