2014年,Spark开源生态系统得到了大幅增长,已成为大数据领域最人气的开源项目之一,活跃在Hortonworks、IBM、Cloudera、MapR和Pivotal等众多知名大数据公司,更拥有Spark SQL、Spark Streaming、MLlib、GraphX等多个相关项目。同时值得一提的是,Spark贡献者中有一半左右的中国人。
短短四年时间,Spark不仅发展为Apache基金会的顶级开源项目,更通过其高性能内存计算及其丰富的生态快速赢得几乎所有大数据处理用户。2015年1月10日,一场基于Spark的高性能应用实践盛宴由Databricks软件工程师连城、百度高级工程师甄鹏、百度架构师孙垚光、百度美国研发中心高级架构师刘少山四位专家联手打造。
Databricks软件工程师连城——Spark SQL 1.2的提升和新特性
谈及Spark SQL 1.2的提升和新特性,连城主要总结了4个方面——External data source API(外部数据源API)、列式内存存储加强(Enhanced in-memory columnar storage)、Parquet支持加强(Enhanced Parquet support)和Hive支持加强(Enhanced Hive support)。
External data source API
连城表示,因为在处理很多外部数据源中出现的扩展问题,Spark在1.2版本发布了External data source API。通过External data source API,Spark将不同的外部数据源抽象成一个关系表格,从而实现更贴近无缝的操作。
External data source API在支持了多种如JSON、Avro、CSV等简单格式的同时,还实现了Parquet、ORC等的智能支持;同时,通过这个API,开发者还可以使用JDBC将HBase这样的外部系统对接到Spark中。
连城表示,在1.2版本之前,开发者其实已经实现了各种各样外部数据源的支持,因此,对比更原生的支持一些外部数据源,External data source API的意义更在于针对相应数据源进行的特殊优化,主要包括Column pruning(列剪枝)和Pushing predicates to datasources(将predicates贴近数据源)两个方面:
Column pruning。主要包括纵横的两种剪枝。在列剪枝中,Column pruning可以完全忽视无需处理的字段,从而显著地减少IO。同时,在某些条件查询中,基于Parquet、ORC等智能格式写入时记录的统计信息(比如最大值、最小值等),扫描可以跳过大段的数据,从而省略了大量的磁盘扫描负载。
Pushing predicates to datasources。在更复杂的SQL查询中,让过滤条件维度尽可能的接近数据源,从而减少磁盘和网络IO,最终提高整体端到端的性能。
使用External data source API之前
使用External data source API之后
搭载了如Parquet和ORC这样的智能格式
连城表示,在Spark 1.2版本中,External data source API并没有实现预期中的功能,在Roadmap中,First class分片支持(First class partitioning support with partition pruning)、Data sink(insertion)API、将Hive作为外部数据源等。
Enhanced in-memory columnar storage
连城表示,不管Shark,还是Spark,内存缓存表的支持都是非常重要的一个特性。他表示,虽然在1.1和之前版本中的列式内存表的性能已然不错,但是还会出现一些问题:第一,大数据量下缓存超大体积表时(虽然不推荐,但不缺现实用例),会出现OOM等问题;第二,在列式存储中,像Parquet、ORC这种收集统计信息然后通过这些信息做partition skipping等操作在之前版本中并没有完全实现。这些问题在1.2版本中都得到了解决,本节,连城主要介绍了语义统一、缓存实体化、基于缓存共享的查询计划、Cache大表时的OOM问题、表格统计(Table statistics)等方面。
缓存实体化。SQLContext.cacheTable(“tbl”)默认使用eager模式,缓存实体化将自动进行,不会再等到表被使用或触发时,避免手动做“SELECT COUNT(*) FROM src;”。同时,新增了“CACHE [LAZY] TABLE tbl [AS SELECT …]”这样的DML。
语义统一。早期时候,SchemaRDD.cache()和SQLContext.cacheTable(“tbl”)这两个语义是不同的。其中,SQLContext.cacheTable会去建立一些列式存储格式相关优化,而SchemaRDD.cache()却以一行一个对象的模式进行。在1.2版本中,这两个操作已被统一,同时各种cache操作都将得到一个统一的内存表。
基于缓存共享的查询计划。两个得到相同结果的cache语句将共享同一份缓存数据。
避免Cache大表时的OOM问题。优化内存表的建立和访问,减少开销,进一步提升性能;在缓存大表时,引入batched column buffer builder,将每一列切成多个batch,从而避免了OOM。
表格统计。Table statistics,类似Parquet、ORC使用的技术,在1.2版本中主要实现了Predicate pushdown(实现更快的表格扫描)和Auto broadcast join(实现更快的表格join)。
最后,连城还详细介绍了一些关于加强Parquet和Hive支持的实现,以及Spark未来的一些工作。
百度基础架构部高级工程师甄鹏——Spark在百度开放云BMR中的实战分享
百度分布式计算团队从2011年开始持续关注Spark,并于2014年将Spark正式引入百度分布式计算生态系统中,在国内率先面向开发者及企业用户推出了支持Spark并兼容开源接口的大数据处理产品BMR(Baidu MapReduce)。在甄鹏的分享中,我们主要了解了百度Spark 应用现状、百度开放云BMR和Spark On BMR三个方面的内容。