登录  
 加关注
查看详情
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

fyzjhh的博客

データベース管理者 の ブログ 、江です

 
 
 

日志

 
 

hive源码分析-sql语句的执行流程3  

2015-12-11 18:02:13|  分类: sql处理 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |


首先从生成执行计划的入口开始
首先生成子查询对应的 Operator
然后生成表对应的 Operator
然后生成join对应的 Operator
  @SuppressWarnings("nls")
  public Operator genPlan(QB qb, boolean skipAmbiguityCheck)
      throws SemanticException {

    // First generate all the opInfos for the elements in the from clause
    // Must be deterministic order map - see HIVE-8707
    Map<String, Operator> aliasToOpInfo = new LinkedHashMap<String, Operator>();

    // Recurse over the subqueries to fill the subquery part of the plan
    for (String alias : qb.getSubqAliases()) {
      QBExpr qbexpr = qb.getSubqForAlias(alias);
      aliasToOpInfo.put(alias, genPlan(qb, qbexpr));
    }

    // Recurse over all the source tables
    for (String alias : qb.getTabAliases()) {
      Operator op = genTablePlan(alias, qb);
      aliasToOpInfo.put(alias, op);
    }

// 无关代码,省略。。。

    // process join
    if (qb.getParseInfo().getJoinExpr() != null) {
      ASTNode joinExpr = qb.getParseInfo().getJoinExpr();

      if (joinExpr.getToken().getType() == HiveParser.TOK_UNIQUEJOIN) {
        QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr, aliasToOpInfo);
        qb.setQbJoinTree(joinTree);
      } else {
        QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo);
        qb.setQbJoinTree(joinTree);
        /*
         * if there is only one destination in Query try to push where predicates
         * as Join conditions
         */
        Set<String> dests = qb.getParseInfo().getClauseNames();
        if ( dests.size() == 1 ) {
          String dest = dests.iterator().next();
          ASTNode whereClause = qb.getParseInfo().getWhrForClause(dest);
          if ( whereClause != null ) {
            extractJoinCondsFromWhereClause(joinTree, qb, dest,
                (ASTNode) whereClause.getChild(0),
                aliasToOpInfo );
          }
        }

        if (!disableJoinMerge)
          mergeJoinTree(qb);
      }

      // if any filters are present in the join tree, push them on top of the
      // table
      pushJoinFilters(qb, qb.getQbJoinTree(), aliasToOpInfo);
      srcOpInfo = genJoinPlan(qb, aliasToOpInfo);
    } else {
      // Now if there are more than 1 sources then we have a join case
      // later we can extend this to the union all case as well
      srcOpInfo = aliasToOpInfo.values().iterator().next();
      // with ptfs, there maybe more (note for PTFChains:
      // 1 ptf invocation may entail multiple PTF operators)
      srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
    }

// 产生最终的 Operator
    Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo, aliasToOpInfo);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Created Plan for Query Block " + qb.getId());
    }

    if (qb.getAlias() != null) {
      rewriteRRForSubQ(qb.getAlias(), bodyOpInfo, skipAmbiguityCheck);
    }

    setQB(qb);
    return bodyOpInfo;
  }

  
  为子查询生成Operator
  private Operator genPlan(QB parent, QBExpr qbexpr) throws SemanticException {
    if (qbexpr.getOpcode() == QBExpr.Opcode.NULLOP) {
      boolean skipAmbiguityCheck = viewSelect == null && parent.isTopLevelSelectStarQuery();
      return genPlan(qbexpr.getQB(), skipAmbiguityCheck);
    }
    if (qbexpr.getOpcode() == QBExpr.Opcode.UNION) {
      Operator qbexpr1Ops = genPlan(parent, qbexpr.getQBExpr1());
      Operator qbexpr2Ops = genPlan(parent, qbexpr.getQBExpr2());

      return genUnionPlan(qbexpr.getAlias(), qbexpr.getQBExpr1().getAlias(),
          qbexpr1Ops, qbexpr.getQBExpr2().getAlias(), qbexpr2Ops);
    }
    return null;
  }
  


  为当前的QB中的表生成Operator
  @SuppressWarnings("nls")
  private Operator genTablePlan(String alias, QB qb) throws SemanticException {

    String alias_id = getAliasId(alias, qb);
    Table tab = qb.getMetaData().getSrcForAlias(alias);
    RowResolver rwsch;

    // is the table already present
    Operator<? extends OperatorDesc> top = topOps.get(alias_id);
    Operator<? extends OperatorDesc> dummySel = topSelOps.get(alias_id);
    if (dummySel != null) {
      top = dummySel;
    }

    if (top == null) {
      rwsch = new RowResolver();
      try {
        StructObjectInspector rowObjectInspector = (StructObjectInspector) tab
            .getDeserializer().getObjectInspector();
        List<? extends StructField> fields = rowObjectInspector
            .getAllStructFieldRefs();
        for (int i = 0; i < fields.size(); i++) {
          /**
           * if the column is a skewed column, use ColumnInfo accordingly
           */
          ColumnInfo colInfo = new ColumnInfo(fields.get(i).getFieldName(),
              TypeInfoUtils.getTypeInfoFromObjectInspector(fields.get(i)
                  .getFieldObjectInspector()), alias, false);
          colInfo.setSkewedCol((isSkewedCol(alias, qb, fields.get(i)
              .getFieldName())) ? true : false);
          rwsch.put(alias, fields.get(i).getFieldName(), colInfo);
        }
      } catch (SerDeException e) {
        throw new RuntimeException(e);
      }
 // 无关代码,省略。。。

      top = putOpInsertMap(OperatorFactory.get(tsDesc,
          new RowSchema(rwsch.getColumnInfos())), rwsch);

      // Add this to the list of top operators - we always start from a table
      // scan
      topOps.put(alias_id, top);

      // Add a mapping from the table scan operator to Table
      topToTable.put((TableScanOperator) top, tab);

      Map<String, String> props = qb.getTabPropsForAlias(alias);
      if (props != null) {
        topToTableProps.put((TableScanOperator) top, props);
        tsDesc.setOpProps(props);
      }
    } else {
      rwsch = opParseCtx.get(top).getRowResolver();
      top.setChildOperators(null);
    }
 // 无关代码,省略。。。

    Operator output = putOpInsertMap(op, rwsch);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Created Table Plan for " + alias + " " + op.toString());
    }

    return output;
  }

    生成Join的Operator

  private Operator genJoinPlan(QB qb, Map<String, Operator> map)
      throws SemanticException {
    QBJoinTree joinTree = qb.getQbJoinTree();
    Operator joinOp = genJoinOperator(qb, joinTree, map, null);
    return joinOp;
  }

  private Operator genJoinOperator(QB qb, QBJoinTree joinTree,
      Map<String, Operator> map,
      Operator joiningOp) throws SemanticException {
    QBJoinTree leftChild = joinTree.getJoinSrc();
    Operator joinSrcOp = joiningOp instanceof JoinOperator ? joiningOp : null;

    if (joinSrcOp == null && leftChild != null) {
      joinSrcOp = genJoinOperator(qb, leftChild, map, null);
    }

    if ( joinSrcOp != null ) {
      ArrayList<ASTNode> filter = joinTree.getFiltersForPushing().get(0);
      for (ASTNode cond : filter) {
        joinSrcOp = genFilterPlan(qb, cond, joinSrcOp);
      }
    }

    String[] baseSrc = joinTree.getBaseSrc();
    Operator[] srcOps = new Operator[baseSrc.length];

    HashSet<Integer> omitOpts = null; // set of input to the join that should be
    // omitted by the output
    int pos = 0;
    for (String src : baseSrc) {
      if (src != null) {
        Operator srcOp = map.get(src.toLowerCase());

        // for left-semi join, generate an additional selection & group-by
        // operator before ReduceSink
        ArrayList<ASTNode> fields = joinTree.getRHSSemijoinColumns(src);
        if (fields != null) {
          // the RHS table columns should be not be output from the join
          if (omitOpts == null) {
            omitOpts = new HashSet<Integer>();
          }
          omitOpts.add(pos);

          // generate a selection operator for group-by keys only
          srcOp = insertSelectForSemijoin(fields, srcOp);

          // generate a groupby operator (HASH mode) for a map-side partial
          // aggregation for semijoin
          srcOps[pos++] = genMapGroupByForSemijoin(qb, fields, srcOp,
              GroupByDesc.Mode.HASH);
        } else {
          srcOps[pos++] = srcOp;
        }
      } else {
        assert pos == 0;
        srcOps[pos++] = joinSrcOp;
      }
    }

    ExprNodeDesc[][] joinKeys = genJoinKeys(joinTree, srcOps);

    for (int i = 0; i < srcOps.length; i++) {
      // generate a ReduceSink operator for the join
      String[] srcs = baseSrc[i] != null ? new String[] {baseSrc[i]} : joinTree.getLeftAliases();
      srcOps[i] = genNotNullFilterForJoinSourcePlan(qb, srcOps[i], joinTree, joinKeys[i]);
      srcOps[i] = genJoinReduceSinkChild(qb, joinKeys[i], srcOps[i], srcs, joinTree.getNextTag());
    }

    JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree,
      joinSrcOp, srcOps, omitOpts, joinKeys);
    joinOp.getConf().setQBJoinTreeProps(joinTree);
    joinContext.put(joinOp, joinTree);

    Operator op = joinOp;
    for(ASTNode condn : joinTree.getPostJoinFilters() ) {
      op = genFilterPlan(qb, condn, op);
    }
    return op;
  }

  


  产生总的 Operator
  @SuppressWarnings("nls")
  private Operator genBodyPlan(QB qb, Operator input, Map<String, Operator> aliasToOpInfo)
      throws SemanticException {
    QBParseInfo qbp = qb.getParseInfo();

    TreeSet<String> ks = new TreeSet<String>(qbp.getClauseNames());
    Map<String, Operator<? extends OperatorDesc>> inputs = createInputForDests(qb, input, ks);

    Operator curr = input;

    List<List<String>> commonGroupByDestGroups = null;

    // If we can put multiple group bys in a single reducer, determine suitable groups of
    // expressions, otherwise treat all the expressions as a single group
    if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
      try {
        commonGroupByDestGroups = getCommonGroupByDestGroups(qb, inputs);
      } catch (SemanticException e) {
        LOG.error("Failed to group clauses by common spray keys.", e);
      }
    }

    if (commonGroupByDestGroups == null) {
      commonGroupByDestGroups = new ArrayList<List<String>>();
      commonGroupByDestGroups.add(new ArrayList<String>(ks));
    }

    if (!commonGroupByDestGroups.isEmpty()) {

      // Iterate over each group of subqueries with the same group by/distinct keys
      for (List<String> commonGroupByDestGroup : commonGroupByDestGroups) {
        if (commonGroupByDestGroup.isEmpty()) {
          continue;
        }

        String firstDest = commonGroupByDestGroup.get(0);
        input = inputs.get(firstDest);

        // Constructs a standard group by plan if:
        // There is no other subquery with the same group by/distinct keys or
        // (There are no aggregations in a representative query for the group and
        // There is no group by in that representative query) or
        // The data is skewed or
        // The conf variable used to control combining group bys into a single reducer is false
        if (commonGroupByDestGroup.size() == 1 ||
            (qbp.getAggregationExprsForClause(firstDest).size() == 0 &&
            getGroupByForClause(qbp, firstDest).size() == 0) ||
            conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) ||
            !conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {

          // Go over all the destination tables
          for (String dest : commonGroupByDestGroup) {
            curr = inputs.get(dest);

            if (qbp.getWhrForClause(dest) != null) {
              ASTNode whereExpr = qb.getParseInfo().getWhrForClause(dest);
              curr = genFilterPlan((ASTNode) whereExpr.getChild(0), qb, curr, aliasToOpInfo, false);
            }
            // Preserve operator before the GBY - we'll use it to resolve '*'
            Operator<?> gbySource = curr;

            if (qbp.getAggregationExprsForClause(dest).size() != 0
                || getGroupByForClause(qbp, dest).size() > 0) {
              // multiple distincts is not supported with skew in data
              if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
                  qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
                throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
                    getMsg());
              }
              // insert a select operator here used by the ColumnPruner to reduce
              // the data to shuffle
              curr = genSelectAllDesc(curr);
              // Check and transform group by *. This will only happen for select distinct *.
              // Here the "genSelectPlan" is being leveraged.
              // The main benefits are (1) remove virtual columns that should
              // not be included in the group by; (2) add the fully qualified column names to unParseTranslator
              // so that view is supported. The drawback is that an additional SEL op is added. If it is
              // not necessary, it will be removed by NonBlockingOpDeDupProc Optimizer because it will match
              // SEL%SEL% rule.
              ASTNode selExprList = qbp.getSelForClause(dest);
              if (selExprList.getToken().getType() == HiveParser.TOK_SELECTDI
                  && selExprList.getChildCount() == 1 && selExprList.getChild(0).getChildCount() == 1) {
                ASTNode node = (ASTNode) selExprList.getChild(0).getChild(0);
                if (node.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
                  curr = genSelectPlan(dest, qb, curr, curr);
                  RowResolver rr = opParseCtx.get(curr).getRowResolver();
                  qbp.setSelExprForClause(dest, SemanticAnalyzer.genSelectDIAST(rr));
                }
              }
              if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
                if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
                  curr = genGroupByPlanMapAggrNoSkew(dest, qb, curr);
                } else {
                  curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
                }
              } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
                curr = genGroupByPlan2MR(dest, qb, curr);
              } else {
                curr = genGroupByPlan1MR(dest, qb, curr);
              }
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug("RR before GB " + opParseCtx.get(gbySource).getRowResolver()
                  + " after GB " + opParseCtx.get(curr).getRowResolver());
            }

            curr = genPostGroupByBodyPlan(curr, dest, qb, aliasToOpInfo, gbySource);
          }
        } else {
          curr = genGroupByPlan1ReduceMultiGBY(commonGroupByDestGroup, qb, input, aliasToOpInfo);
        }
      }
    }


    if (LOG.isDebugEnabled()) {
      LOG.debug("Created Body Plan for Query Block " + qb.getId());
    }

    return curr;
  }

  

  下面的这些都是在 运行过程中所使用的方法,比较重要,就不一一列举了

genFilterPlan
genHavingPlan
genLimitMapRedPlan
genLimitPlan
genJoinKeys
genSelectPlan
genTablePlan
genFileSinkPlan
genJoinPlan
  

  评论这张
 
阅读(289)| 评论(0)

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018