Cosmos是微软大数据的应用平台,内部使用不开源,最开始支撑Bing的后端,到后来扩展到整个微软。作为一个完整的系统,基本上Hadoop系统里面有的它也多多少少都有,从文件系统到执行环境,从interactive query到streaming,从data ingestion到workflow。很多的东西只在internal应用也就不便提及,然而很多核心技术其实也以论文的方式发表出来。因为是老东家的东西,又有签订保密协议,所以我会把比较主要的论文都列出来。这篇文章里的内容会严格的遵循这些论文里面提到的。至于这些论文之外的东西,就不方便在本文里说了。除此之外,系统很大,我个人工作的领域很小,了解的东西广度深度都有限,难免以偏概全,管中窥豹只能看到那一斑了。
下面是一些论文列表:
SCOPE: easy and efficient parallel processing of massive data sets. VLDB 2008
Incorporating partitioning and parallel plans into the SCOPE optimizer. ICDE 2010
SCOPE: parallel databases meet MapReduce. VLDB Journal 2012
Continuous Cloud-Scale Query Optimization and Processing. VLDB 2013
Apollo: Scalable and Coordinated Scheduling for Cloud-Scale Computing OSDI 2014
JetScope: Reliable and Interactive Analytics at Cloud Scale. VLDB 2015
StreamScope: Continuous Reliable Distributed Processing of Big Data Streams NSDI 2016
此外Dryad和Windows Azure Blob Store的论文也都有参考价值。这个论文列表并不全,有些不是特别重要的论文就不列出来了。本文的内容以2012年的VLDB Journal论文为基础。
Cosmos作为一个大数据解决方案,就像其他任何的大数据系统一样,有自己的存储,执行和语言系统。对应于Hadoop来说就是HDFS, Hadoop MapReduce,Hive/Pig. Cosmos的存储系统,很大程度上也基于了和Hadoop差不多的理念,都是从GFS来的。文件在这个文件系统里面称为Stream,而每个block则称为extent。文件系统的实现上,比起Hadoop更早的实现了HA,用的是Paxos协议。相对于Hadoop的文件系统更加的scalable。有关这个文件系统的详细情况,最为接近有参考价值的还是Windows Azure Blob Store的那篇论文,因为windows azure blob store是在Cosmos早期的一个codebase上开发出来的,所以存留了很多Cosmos的文件系统的设计。至于其他的,一则我不做文件系统,二则,也没有专门的文献公开的发布出来讨论这个文件系统,所以我就不详细谈了。
在Cosmos里面除去可以随便存stream以外,还有一种特殊的stream叫做structured stream。后者可以认为是cosmos team控制的文件格式。这个文件格式前后有若干个版本,其主要目的是在stream里面同时写入data 和metadata,而这些metadata则会帮助execution层面更好的做optimization,更快的处理数据。structured stream最重要的特性有如下几个:
- Partitioning,数据是被partition的,hash partition和range partiton都是支持的。
- Data Affinity,简单一点来说affinity对storage层面是一个hint,每个extent都有一个affinity id,系统会设法让至少一个copy的这些extent能够co-located越近越好。
- Indexing,无论是hash partition 还是range partition,每个partition都是有index的,类似于B+树,前面还会有bloom filter。
- Column Groups,系统提供了Column Group的概念,系统会把常见的column以column store的方式存储在一起。
- Stream Reference,这个功能简单的说一个新生成的stream可以指定用一个老的stream的partition info作为新的partition的info,这在实际应用中,比如每天都产生一个stream的场景下,对于很多query optimization比如说partition pruning很重要。
Cosmos的execution layer用的是Dryad,有关Dryad的情况我在之前的文章里面已经讲过了,欢迎大家参考这篇文章:大数据那些事(5):沉没的微软以及Dryad。
Cosmos上面跑的语言叫做SCOPE。这个语言很大程度上是PIG+SQL。它是一个data flow 的语言,然后通过提供一系列的命令对数据进行操作。如果熟悉spark的人应该也大概知道这种操作是什么样的。语言的命令分两种,一种是输入和输出:分别是EXTRACT/OUTPUT, 以及SSTREAM/OUTPUT TO SSTREAM。另外则是数据处理。数据处理提供了SQL的SELECT支持,同时还提供了三种extension:
- PROCESS,这相当于MAP
- REDUCE, 这个就是REDUCE
- COMBINE,这个不是HADOOP的COMBINE,是full outer join的支持。
要实现这些东西extenion,需要用C#对特定的接口进行开发,简单来说,PROCESS 的接口就是 foreach row do something, REDUCE是foreach group do something,而COMBINE则实打实的拿两个input channel进来做full outer join。所以从这个角度来说,extension其实比MapReduce更实在一些,做join也更自在一些。
SCOPE的另外一个特点是整个type系统完全的基于.NET,所以任何的时候用户都可以随时随地的用C#的函数而不受到任何影响,包括系统函数和用户自定义函数。能够非常灵活的运用各种C#的函数,是SCOPE非常强大的一个功能。其背后的实现机制是基于了C#的compiler as a service项目Roslyn。
SCOPE实现了对自定义aggregate函数的支持,SCOPE也实现了对UDO的支持,就是任意的C# class可以自然的成为stream的type,为此在structured stream里面就需要解决serialization和de-serialization的问题。
在这些语言特性之上SCOPE实现了view,streamset,macro等大量的语言特性的支持,鉴于文章里面并没有展开具体讨论,我就略过了。
我们注意到,Dryad是支持任意的DAG的执行的,但是SCOPE开放给用户的只是简单的MAP/REDUCE/JOIN的接口,最多只有两个input一个output。这并不是说SCOPE对Dryad的应用仅仅限于这些图的支持。相反的,SCOPE自身的SELECT语句背后产生的execution plan可以非常的复杂。选择性的对用户开放简单的图的支持,而系统内部优化产生的则可以是很复杂的图是SCOPE的一个特点。
SCOPE的optimizer的framework是基于SQL Server的cascading framework。但是rule则很不一样。SCOPE在query optimization上做了很多的工作。常见的比如说partition pruning。通过在optimization阶段的操作,对structured stream减少需要读的extent的数目。这种优化往往能够很迅速的提高工作效率。Common subexpression是另外一个比较经典的optimization。
SCOPE的optimizer里面比较重要的一个特点是去决定怎么样去partition 这个数据。举个简单的例子,如果说TABLE A JOIN B on KEY C=D AND E=F,然后GROUP BY 在 C上,那么系统会选择co-partition A和B在KEY C和D上,从而避免在GROUP BY的时候再做一次partition。当然实际情况复杂,如果C是gender的话,这个optimization就不好了。
SCOPE的另外一个特点是extensions都可以annotate,举个例子说,对PROCESS,如果我写了一个PROCESSOR,我可以告诉系统说这个PROCESSOR不会改变sorting order,这样optimizer就知道这个sorting property可以从图的下面propogate到上面来。
SCOPE也完成了通过看过去的job的运行情况来反馈进来帮助产生新的plan的工作,这个对于日积月累的routine的工作有很好的指导意义,但是对于突然之间数据可能有很大改变的系统则不好说了。
SCOPE的runtime是C++的code generation,这个就是经典的volcano execution model,每个operator实现open,next,close,然后从头上pull就可以了。execution的另外一个部分就是要决定哪些vertex被包成一个大的node在一台机器上执行。哪些则需要被分布到不同的机器上去。
COSMOS在14年前后花了很多时间去improve了它的scheduler,这个在发表的论文APOLLO里面有比较详细的叙述,介于我不是这方面的专家,也就不在这里不懂装懂的总结了。
转自:飞总聊IT