首页 > 技术文章 > PDW中的Split Querying Process

biaobiaoqi 2013-08-29 11:49 原文

最近看了关于 SQL Server 的分布式处理方面的论文,觉得它提出的 Polybase 跟之前看过的 HadoopDB 有些神似,这里做个小总结(抽空再把 HadoopDB 的总结贴出来)。

不算翻译,只是挑出自己认为是重点的部分。详细情况,还请论文查阅原文,引用中有写明出处。文章末尾有我总结的 slides,可以辅助查阅。

由于缺乏实践经验,很多东西未必能理解其本质。如有其他观点,还请多指教。

当下的计划就是开始自己搭环境,实践起来!~

背景

商业应用中,越来越多的需要将结构化数据和非结构化数据存储、处理混合起来。 目前,已经有很多公司、产品致力于这部分的研究,微软发的这篇论文,也正是基于 PDW V2 的这一新功能提出的新的解决方案。

Polybase 简介:

是 SQL Server PDW V2 的一个新功能:通过使用 SQL 来管理和查询 hadoop 集群中的数据。 它同时能处理结构化和非结构化的数据,特点是结合了 HDFS 的外部表,使用基于开销的查询优化器来做分裂查询处理。

相关研究

  • sqoop:用于在 hadoop 和结构化数据 9 比如关系型数据库之间传输数据。

  • teradata& Asterdata& Greenplum& Vertica:通过外部表(external table)实现基于 SQL 的对 hadoop 中所存数据的操作。

  • Orable:基本机制也是建立外部表;另外还开发了用于加载 hadoop 的大数据到 Oracle 自家数据库的工具 OLH(Oracle loader for Hadoop)。

  • IBM、Netezza:使用 mapreduce 方法获取分布式环境下各个节点的数据执行处理。

  • Hadapt: (HadoopDB):HadoopDB 是来自耶鲁大学的创意,并商业化为 Hadapt 项目。这是首个提出使用类 SQL 语言、集合 Hadoop 系统实现对 RDBMS 的操作的想法。实现相对简单,源代码 3 千多行,在 Hadoop 中对其中的几个模块做了二次开发。

PDW

Polybase 是 PDW V2 的一个新 feature,那么,首先,让我们来看一下所谓的 PDW 是什么。

PDW 是一个基于 SQL Server 的 shared-nothing 的并行数据库系统。

PDW(Parallel Data Warehouse)架构图:

PDW 系统中的组件:

节点
  • control node:

类似于 Hadoop 中的 master 节点。运行着 PDW Engine,负责:查询语法分析,优化,生成分布式执行计划 DSQL,控制计划实施

  • compute node:

类似于 Hadoop 中的 slave 节点。数据存储和查询执行

DMS

Data Moving Service,起功能有:

  • 1.repartition rows of a table among SQL Server instances on PDW compute nodes

  • 2.针对 ODBC 的类型转换。

Polybase

Polybase 使用场景

如图

(a)中 PDW 与 Hadoop 一起完成了数据处理任务,并输出结果;

(b)中处理数据后,结果直接存储到 HDFS 中

充分利用 SQL Server PDW 的性能优势,特别是它的基于开销的并行查询优化器和执行引擎。

Polybase 的环境需求

  • 1.PDW 与 Hadoop 集群可以重叠,也可以分离。
  • 2.windows 和 linux 部署的 hadoop 集群都支持。
  • 3.支持所有标准的 HDFS 文件格式,包括文本、序列化文件、RCFiles。只要定义好对应的 Inputformat 和 outputFormat,所有定制文件格式也支持。

核心组件

  • 1.外部表:用于在 PDW 中实现对数据的操作语义。
  • 2.HDFS Bridge:DMS 中的组件,用于实现 PDW 节点域 Hadoop 的通信。
  • 3.基于开销的查询优化器:将常规 SQL 查询语句转化为 DSQL(分布式 SQL 查询语句),并结合集群状况(比如 Hadoop 和 PDW 集群规模,运行状况等),合理选择优化方式。

1.外部表

PDW 需要了解到 Hadoop 集群中数据的模型。于是就有了这个外部表。 实例如下:

创建集群:

1
2
3
4
5
CREATE HADOOP_CLUSTER GSL_CLUSTER

      WITH (namenode=‘hadoop-head’,namenode_port=9000,

      jobtracker=‘hadoop-head’,jobtracker_port=9010);

创建文件格式:

1
2
3
4
5
6
7
CREATE HADOOP_FILEFORMAT TEXT_FORMAT

  WITH (INPUT_FORMAT=‘polybase.TextInputFormat’,

  OUTPUT_FORMAT = ‘polybase.TextOutputFormat’,

  ROW_DELIMITER = '\n', COLUMN_DELIMITER = ‘|’);

根据集群和文件信息,创建外部表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE EXTERNAL TABLE hdfsCustomer

  ( c_custkey       bigint not null,   

    c_name          varchar(25) not null,

     .......
    )

WITH (LOCATION='/tpch1gb/customer.tbl',

FORMAT_OPTIONS (EXTERNAL_CLUSTER = GSL_CLUSTER,

EXTERNAL_FILEFORMAT = TEXT_FORMAT));

2.HDFS Bridge

结构如图:

HDFS shuffle 阶段:通过 DMS 从 hadoop 读取数据的阶段。 涉及到 hdfs 中的数据处理时,处理过程如下:

  • 1.跟 namenode 通信,获得 hdfs 中文件的信息

  • 2.hdfs 中文件信息 + DMS 实例个数 -> 每个 DMS 的输入文件(offset、长度) #力求负载均衡

  • 3.将 DMS 的输入文件信息传递给各个 DMS,DMS 通过 openRecordReader()方法构建 RecordReader 实例,直接与对应的 datanode 通信,获取数据,并转换为 ODBC 格式(有时候类型转换提前到 mapreduce 中以利用 hadoop 集群的计算能力)。读取过程中,使用了 buffer 机制提高效率。有时候数据会被提前到从 HDFS 中读出时执行,而不是到 DMS 中执行。这是为了充分利用 hadoop 集群的计算能力,节约 CPU 秘籍的 DMS shuffle 的计算。

写入 hadoop 的过程与此类似。

3.查询优化

  • 1.PDW Parser(在 PDW Engine 的进程中完成)。

  • 2.SQL Server Query Optimizer(在 control node 的 SQL Server 的进程中完成):使用 bottom-up 的方式进行查询优化,并在合理的位置插入数据迁移的操作符(用于分布式环境的数据迁移指令),:生成查询计划,存储在 Memo 数据结构(http://www.benjaminnevarez.com/2012/04/inside-the-query-optimizer-memo-structure/)中 。

  • 3.XML geneator(在 control node 的 SQL Server 的进程中完成)。接收 Memo,并转换格式,往下传递。

  • 4.Query Optimizer(在 PDW Engine 的进程中完成):根据 Memo 生成 DSQL。

  • 5.基于开销的查询优化:判定是否将 SQL 语句推送到 Hadoop 中执行。

    考虑外部表的样本数据的直方图、集群的规模等因素…选择最优优化方案。

样本数据处理:

定义对应外部表列的详细样本数据:

1
2
CREATE STATISTICS hdfsCustomerStats ON
hdfsCustomer (c_custkey);

对样本数据的处理的方式如下:

  • 1.通过 DMS 或者 map job 读取 sample 数据,
  • 2.分发到不同的 comute 节点的暂存表。
  • 3.每个节点分别计算直方图。
  • 4.汇总直方图,存储到 control node 数据库的 catalog 中

缺点是在此过程中没有利用好 hadoop 集群的计算能力。

语义兼容

涉及到 Java 和 SQL 以及之间的转换。包括这三个方面:

  • 数据类型的语义.
  • 表达式的语义
  • 异常处理机制

例如:”a+b”,其中 a,b 都为 null,SQL 结果为 NULL,而 Java 则会抛出 NullException。

处理原则是:能转化的类型则做好转化包装;不能转换的则标记为无法实现,仅限 PDW 实现。

举例:

1
2
3
   SELECT count (*) from Customer
  WHERE acctbal < 0
  GROUP BY nationkey

如图所示为处理过程

Polybase 的 MapReduce Join 实现

使用 distributed hash join 实现(只有 equi-join 能被在 mapreduce 中完成)

小表作为 build side ,并被物化(materialized)到 HDFS,大表作为 probe side。

在 Hadoop 的 Map 任务中:读取物化好的 build side 到内存,构成 hash table。

probe side 经过 hash 后对比 hash 表,做正确的链接。

为了让 build side 置于内存中,需要计算 build side 的大小、每个 task 拥有的内存大小,task 中执行其他操作需要的内存空间。 当然,build side 也可能被复制多分,以提高效率。

本文演示 slides 下载链接,点击获取

引用

推荐阅读