首页 > 技术文章 > Hive执行流程与源码浅析

MachCraft 2021-08-11 23:37 原文

一、梳理定位

经过之前的源码准备

为了减少理解上的差异,直接在官网找到架构图进行分析。


从上图可以直观的看出
Hive的组件主要有

UI: 用户交互界面,用于提交SQL文件任务,观察任务进度占用等
DRIVER: 见名知意,用于接收SQL语句,是客户提交任务的第一接收者,是Hive执行的发起者,也是执行结果的反馈者,并且提供JDBC/ODBC接口为模型的执行和API的获取
COMPILER: 同样见名知意,解析CLI提交的语句,根据不同的查询做语义分析,并从metastore中获取表和元数据生成执行计划
EXECUTION ENGINE: 创建执行计划的组件,基于DAG的阶段执行。执行引擎管理不同阶段的依赖关系,并在各自适应的组件上执行这些计划
METASTORE: 存储表和分区的所有信息,包扩列和列类型的组件。同时包含序列化和反序列化的能力用于生成相应的HDFS文件

SQL提交到Driver端后,会做词法和语法的解析验证的同时做语义的分析(获取元数据信息),通过对元数据的解析得到逻辑计划并进行优化。解析完毕逻辑计划的同时生成物理计划(体现在执行算子上),至此任务就交给底层引擎执行了
本质上,Hive工作的过程就是将SQL语句解析成引擎执行的过程

但上图概括度过大,没有表现出具体每个组件的工作职责,通过查阅各种架构图整理出一套直观的流程图

1.开始HQL被提交给客户端
2.解析器对HQL进行语法解析生成AST
3.遍历AST生成QeuryBlock
4.Logic Plan Generator将QB转换成逻辑执行计划
5.Logic Optimizer优化逻辑执行计划
6.Physical Plan Gernerator将逻辑计划转化成物理计划
7.Physical Optimizer将生成的物理计划进行优化
8.最终将完整的计划交由引擎去执行

二、上手

Hive任务提交入口,位于Driver类中,该类掌握Hive任务从起始到结束的过程
开始找到run()方法,逐步找到runInternal()、compileInternal(),再到compile(),在该方法中看到了AST树的创建,传值来源于ParseUtils.parse()


  public CommandProcessorResponse run(String command, boolean alreadyCompiled)
        throws CommandNeedRetryException {
    // 
    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
   }

  private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
    errorMessage = null;
    SQLState = null;
    downstreamError = null;
    lDrvState.stateLock.lock();
    try {
      //判断是否编译
      if (alreadyCompiled) {
        if (lDrvState.driverState == DriverState.COMPILED) {
          lDrvState.driverState = DriverState.EXECUTING;
        } else {
          errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
          console.printError(errorMessage);
          return createProcessorResponse(12);
        }
      } else {
        lDrvState.driverState = DriverState.COMPILING;
      }
    } finally {
      lDrvState.stateLock.unlock();
    }

    // a flag that helps to set the correct driver state in finally block by tracking if
    // the method has been returned by an error or not.
    boolean isFinishedWithError = true;
    try {
      HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
          alreadyCompiled ? ctx.getCmd() : command);
      // Get all the driver run hooks and pre-execute them.
      List<HiveDriverRunHook> driverRunHooks;
      try {
        driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
            HiveDriverRunHook.class);
        for (HiveDriverRunHook driverRunHook : driverRunHooks) {
            driverRunHook.preDriverRun(hookContext);
        }
      } catch (Exception e) {
        errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
        SQLState = ErrorMsg.findSQLState(e.getMessage());
        downstreamError = e;
        console.printError(errorMessage + "\n"
            + org.apache.hadoop.util.StringUtils.stringifyException(e));
        return createProcessorResponse(12);
      }

      PerfLogger perfLogger = null;

      int ret;
      //判断是否编译,若没编译就执行
      if (!alreadyCompiled) {
        // compile internal will automatically reset the perf logger
        //调用compileInternal()
        ret = compileInternal(command, true);
        // then we continue to use this perf logger
        perfLogger = SessionState.getPerfLogger();
        if (ret != 0) {
          return createProcessorResponse(ret);
        }
      } else {
        // reuse existing perf logger.
        perfLogger = SessionState.getPerfLogger();
        // Since we're reusing the compiled plan, we need to update its start time for current run
        plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
      }


  private int compileInternal(String command, boolean deferClose) {
    int ret;

    Metrics metrics = MetricsFactory.getInstance();
    if (metrics != null) {
      metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
    }

    final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
      command);
    if (compileLock == null) {
      return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
    }

    try {
      if (metrics != null) {
        metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
      }
     //调用compile()
      ret = compile(command, true, deferClose);
    } finally {
      compileLock.unlock();
    }


  public int compile(String command, boolean resetTaskIds, boolean deferClose) {
    PerfLogger perfLogger = SessionState.getPerfLogger(true);
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
    lDrvState.stateLock.lock();
    try {
      lDrvState.driverState = DriverState.COMPILING;
    } finally {
      lDrvState.stateLock.unlock();
    }

    command = new VariableSubstitution(new HiveVariableSource() {
      @Override
      public Map<String, String> getHiveVariable() {
        return SessionState.get().getHiveVariables();
      }
    }).substitute(conf, command);

    String queryStr = command;

    try {
      // command should be redacted to avoid to logging sensitive data
      queryStr = HookUtils.redactLogString(conf, command);
    } catch (Exception e) {
      LOG.warn("WARNING! Query command could not be redacted." + e);
    }

    if (isInterrupted()) {
      return handleInterruption("at beginning of compilation."); //indicate if need clean resource
    }

    if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
      // close the existing ctx etc before compiling a new query, but does not destroy driver
      closeInProcess(false);
    }

    if (resetTaskIds) {
      TaskFactory.resetId();
    }

    String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);

    //save some info for webUI for use after plan is freed
    this.queryDisplay.setQueryStr(queryStr);
    this.queryDisplay.setQueryId(queryId);

    LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);

    SessionState.get().setupQueryCurrentTimestamp();

    // Whether any error occurred during query compilation. Used for query lifetime hook.
    boolean compileError = false;
    try {

      // Initialize the transaction manager.  This must be done before analyze is called.
      final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);
      // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks

      // if compile is being called multiple times, clear the old shutdownhook
      ShutdownHookManager.removeShutdownHook(shutdownRunner);
      shutdownRunner = new Runnable() {
        @Override
        public void run() {
          try {
            releaseLocksAndCommitOrRollback(false, txnManager);
          } catch (LockException e) {
            LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " +
                e.getMessage());
          }
        }
      };
      ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);

      if (isInterrupted()) {
        return handleInterruption("before parsing and analysing the query");
      }
      if (ctx == null) {
        ctx = new Context(conf);
      }

      ctx.setTryCount(getTryCount());
      ctx.setCmd(command);
      ctx.setHDFSCleanup(true);

      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
      //从AST中解析语法词法
      ASTNode tree = ParseUtils.parse(command, ctx);
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);


深入到ASTNode parse()方法可以看到核心是pd.parse(),在词法和语法的多重解析,且判断语法没有错误后,最终获取成型的AST树。

  public static ASTNode parse(
      String command, Context ctx, String viewFullyQualifiedName) throws ParseException {
    ParseDriver pd = new ParseDriver();
    //获取AST信息
    ASTNode tree = pd.parse(command, ctx, viewFullyQualifiedName);
    tree = findRootNonNullToken(tree);
    handleSetColRefs(tree);
    return tree;
  }

  public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)
      throws ParseException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Parsing command: " + command);
    }
    //做词法解析
    HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
    TokenRewriteStream tokens = new TokenRewriteStream(lexer);
    if (ctx != null) {
      if (viewFullyQualifiedName == null) {
        // Top level query
        ctx.setTokenRewriteStream(tokens);
      } else {
        // It is a view
        ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);
      }
      lexer.setHiveConf(ctx.getConf());
    }
    HiveParser parser = new HiveParser(tokens);
    if (ctx != null) {
      parser.setHiveConf(ctx.getConf());
    }
    parser.setTreeAdaptor(adaptor);
    HiveParser.statement_return r = null;
    try {
      r = parser.statement();
    } catch (RecognitionException e) {
      e.printStackTrace();
      throw new ParseException(parser.errors);
    }

    if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
      LOG.debug("Parse Completed");
    } else if (lexer.getErrors().size() != 0) {
      throw new ParseException(lexer.getErrors());
    } else {
      throw new ParseException(parser.errors);
    }
    //获取成型的AST
    ASTNode tree = (ASTNode) r.getTree();
    tree.setUnknownTokenBoundaries();
    return tree;
  }

得到AST后,Hive需要进一步将其细化为最小查询单元QB。在SemanticAnalyzer中找到doPhase1QBExpr()方法,在其中遍历AST达到获取QB信息的目的,在这之前Hive或根据AST根节点的类型来选择对应的解析方法。

  public void doPhase1QBExpr(ASTNode ast, QBExpr qbexpr, String id, String alias, boolean insideView)
      throws SemanticException {

    assert (ast.getToken() != null);
    if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
      //创建QB
      QB qb = new QB(id, alias, true);
      qb.setInsideView(insideView);
      Phase1Ctx ctx_1 = initPhase1Ctx();
      doPhase1(ast, qb, ctx_1, null);

      qbexpr.setOpcode(QBExpr.Opcode.NULLOP);
      qbexpr.setQB(qb);
    }
    // setop
    else {
      switch (ast.getToken().getType()) {
      case HiveParser.TOK_UNIONALL:
        qbexpr.setOpcode(QBExpr.Opcode.UNION);
        break;
      case HiveParser.TOK_INTERSECTALL:
        qbexpr.setOpcode(QBExpr.Opcode.INTERSECTALL);
        break;
      case HiveParser.TOK_INTERSECTDISTINCT:
        qbexpr.setOpcode(QBExpr.Opcode.INTERSECT);
        break;
      case HiveParser.TOK_EXCEPTALL:
        qbexpr.setOpcode(QBExpr.Opcode.EXCEPTALL);
        break;
      case HiveParser.TOK_EXCEPTDISTINCT:
        qbexpr.setOpcode(QBExpr.Opcode.EXCEPT);
        break;
      default:
        throw new SemanticException(ErrorMsg.UNSUPPORTED_SET_OPERATOR.getMsg("Type "
            + ast.getToken().getType()));
      }

      //两次遍历AST填充QB,这里使用的是深度遍历,只要找到根节点便可以遍历整个AST
      // query 1
      assert (ast.getChild(0) != null);
      QBExpr qbexpr1 = new QBExpr(alias + SUBQUERY_TAG_1);
      doPhase1QBExpr((ASTNode) ast.getChild(0), qbexpr1, id + SUBQUERY_TAG_1, alias
          + SUBQUERY_TAG_1, insideView);
      qbexpr.setQBExpr1(qbexpr1);

      // query 2
      assert (ast.getChild(1) != null);
      QBExpr qbexpr2 = new QBExpr(alias + SUBQUERY_TAG_2);
      doPhase1QBExpr((ASTNode) ast.getChild(1), qbexpr2, id + SUBQUERY_TAG_2, alias
          + SUBQUERY_TAG_2, insideView);
      qbexpr.setQBExpr2(qbexpr2);
    }
  }

具体的解析过程在analyzeInternal()可以找到,源码给出的注释直观且有序。

  void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
     //从抽象语法树开始分析
    // 1. Generate Resolved Parse tree from syntax tree
    LOG.info("Starting Semantic Analysis");
    //change the location of position alias process here
    processPositionAlias(ast);
    if (!genResolvedParseTree(ast, plannerCtx)) {
      return;
    }

    //2.生成逻辑执行计划
    // 2. Gen OP Tree from resolved Parse Tree
    Operator sinkOp = genOPTree(ast, plannerCtx);

    if (!unparseTranslator.isEnabled() && tableMask.isEnabled()) {
      // Here we rewrite the * and also the masking table
      ASTNode tree = rewriteASTWithMaskAndFilter(tableMask, ast, ctx.getTokenRewriteStream(),
              ctx, db, tabNameToTabObject, ignoredTokens);
      if (tree != ast) {
        ctx.setSkipTableMasking(true);
        init(true);
        //change the location of position alias process here
        processPositionAlias(tree);
        genResolvedParseTree(tree, plannerCtx);
        if (this instanceof CalcitePlanner) {
          ((CalcitePlanner) this).resetCalciteConfiguration();
        }
        sinkOp = genOPTree(tree, plannerCtx);
      }
    }

对解析上下文进行优化

  public ParseContext optimize() throws SemanticException {
    for (Transform t : transformations) {
      t.beginPerfLogging();
      pctx = t.transform(pctx);
      t.endPerfLogging(t.toString());
    }
    return pctx;
  }
  public PhysicalContext optimize() throws SemanticException {
    for (PhysicalPlanResolver r : resolvers) {
      pctx = r.resolve(pctx);
    }
    return pctx;
  }

task执行是按照每个executeTask()来做的,最终被封装到进线程类做到并发

        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
          if (!runner.isRunning()) {
            break;
          }
        }

推荐阅读