摘要
      MapReduce计算框架作为腾讯Typhoon云计算平台的核心模块,已经广泛应用在广告、网页搜索等各种大数据处理中。本文主要介绍MapReduce计算框架的设计思想以及各个模块;并介绍框架的特点以及各种应用案例。
1 背景
    存储和处理TB级以上的大数据日益成为当前公司各主要业务部门的重要问题。对数据存储系统的高可靠性以及对数据处理的日益增加的及时性也使得公司对高效处理系统的需求与日俱增。为此,Typhoon(台风)云计算平台应运而生。
    Typhoon(台风)是由基础架构部开发的集分布式存储和分布式计算于一体的云计算平台。它由提供高可靠性的分布式文件系统XFS、分布式半结构化存储系统XCube,分布式计算框架MapReduce,以及机群调度系统Torca, 认证组件Taas等组件构成。与Hadoop不同,Typhoon平台早在09年设计之初就将集群管理,数据存储,和数据计算分离,在软件栈上实现松耦合的方式来实现。这意味着Typhoon平台可以有效结合不同的存储和计算模块,并利用Torca资源调度来取得更好的利用率和执行效率。Typhoon平台天生集成了XFS, XCube等存储系统,与此配套的MapReduce计算框架能充分利用这些系统所提供的特性发挥最大性能。
    作为核心组件之一的MapReduce计算框架,提供给用户易于编写运行在大型分布式集群环境下的应用程序,并通过Torca调度系统运行。与当前流行的Hadoop提供的计算框架不同,Typhoon平台的MapReduce计算框架是通过一组编程连接库(library)来实现,而不是通过提供服务(service)来实现的。应用程序经过编译连接后直接变成可运行在分布式集群环境下的二进制文件。此外,由于以编程链接库的方式来设计,第三方系统很容易将MapReduce作为基础来构建分布式数据处理平台,例如目前广泛应用在结构化日志处理的Dragger系统就是典型的例子。
    下面将具体介绍MapReduce计算框架(简称MapReduce框架)的设计与实现细节。
    2 Mapreduce计算框架的设计
    2.1设计思想
    MapReduce框架设计思想来源于传统单机系统中的多线程处理模型,它是这种模型在分布式操作系统(Torca)上的实践。
    在单机系统中,程序通过调用多线程API启动多个工作线程,工作线程从任务队列获取任务进行处理(如图1所示)。在Typhoon系统中,基于MapReduce框架实现的应用程序会被Torca调度到分布式集群上,并以一个Master和一组Worker相互协作的方式运行。Master承担了任务管理和调度的职责;而这组Worker则像单机中的多个线程一样承担了计算处理的职责;而类似于单机环境下的存储设备(如磁盘文件系统和数据库)则对应于Typhoon平台中的XFS和XCube(如图2所示)。


Figure 1 单机多线程模型



Figure 2 台风MapReduce框架模型


    从上图2可以看出,基于MapReduce框架的应用程序由三个逻辑模块组成:
    Controller客户端:用于提交应用程序、显示应用程序的执行进度以及返回执行过程中各种状态息;
    Master节点:负责对应用任务进行划分和调度,处理失败、超时等错误问题,同时也承担web页面的监控器的功能;
    Worker工作节点:负责执行由Master节点分配的各种计算任务。
    后面会简单介绍各个模块。
    2.2 框架特点
    Typhoon平台上MapReduce框架有如下的特点:
    1.  作业之间独立,Master各自管理独立的任务和计算节点,无耦合。
    2.  用户可以独立升级各自的MapReduce库进行特性更新,不受其他用户的影响,集群中可以有多个版本的MapReduce同时存在。
    3.  用户作业所需的资源可以各自按需设置,更有效地利用集群资源。
    4.  强大的作业实时展示功能,有助于用户分析作业运行状况。
    5.  利用Torca提供的进程级的容错机制,简化实现模型。
    6.  内嵌包括Text,RecordIO和SSTable,XCube等多种源的数据处理,满足绝大部分业务需求。用户可以扩展输入输出类型。
    3 模块介绍
    Controller
    Controller相当于作业的客户端。首先,它负责校验用户作业(Job)描述的完整性以及配置的合理性,验证作业指定的Mapper和Reduce类是否有注册等。在校验成功后,Controller向Torca启动一个进程,并将其角色指定为Master。Master进程启动后,Controller向Master发起RPC调用进行任务提交。之后,Controller周期请求Master获得作业的状态进度等信息,在终端上进行展示。
    Master
    Master启动后,会等待接收Controller提交的作业描述。对接收到的作业描述,进行输入切分,将文件或XCube表划分成一个个的任务(Task)。之后根据任务的情况启动特定数据Worker进行计算。Worker会通过RPC通信与Master保持通信,并返回处理结果或者获取新的task。当整个作业结束后,Masrer会通过RPC反馈给Controller,并自行退出。
    Worker
    Master所切分的Task会分为:Map类Task和Reduce类Task 。
    对于Map类Task,Worker首先按输入描述读取XFS文件或XCube表,并使用应用程序注册的Mapper类对每一条数据记录进行处理。当Map任务完成后,它的输出会注册在Worker内的子模块Shuffle Server,为下一步的Reduce任务提供Shuffle服务。
    对于Reduce类Task,Worker会先从Master获取已完成的Map任务列表,并到完成这些任务的Worker拉取本Reduce对应的数据分片(Shard)数据。当集齐全部Map的指定分片数据后,Worker会使用用户注册的Reducer类进行处理,并输出结果。
    Monitor
    Monitor是Master中的一个子模块,它提供本作业的状态展示功能,包括展示作业的整体信息,Worker状态信息,以及各个子任务的运行信息等(如图3)。这里以作业整体信息页面为例,从上向下依次有作业的数据量预估和时间信息;接着是MapReduce任务的进度信息;之后有作业中用户注册的各类计数器历史统计。其中,各种用户自定义的计数器对于分析用户作业的运行情况有极大的帮助。


图3 monitor展示作业运行的信息


    4 框架支持特性
    多数据源和多Mapper类的支持
    框架原生支持XFS文件和Xcube表,可以用于多种数据的计算处理。此外,框架也支持在一次作业中使用不同的Mapper类,对同一类型不同格式的数据进行处理。另外,用户还可以自定义自己的输入输出流处理接口,来处理特定格式的数据源。
    多种可选任务调度方案
    Master对于Task的下发,支持随机、本地优先、负载限制等多种调度方式。对于不同的数据源和数据分布都能有效地支持。
    随机调度:对于不支持位置信息的输入流或在输入分布具有集中性时,通常可以采用随机调度。
    本地优先调度:对于带位置信息且输入分布均匀的输入,可以将Task优先发给其所在机器的Worker,利用本地读取的特性,加快IO速度。
    负载限制调度:对于输入系统的节点间处理能力严重不均衡时,可以在调度时限制单输入节点上并发任务的上限,以避免给输入系统造成过大冲击。
    容错
      框架对Worker和Master均提供容错支持。框架能够将临时失败的Task完成的数据进行恢复,以及对不可恢复的Task进行重新下发,从而保证任务的运行完成。而单点存在的Master,则会定期在XFS记录CheckPoint,当进程被迁移时,利用CheckPoint进行状态的恢复,从而继续完成作业。
    本地资源加载VPATH
      框架允许用户自定义应用程序所要使用的本地资源(文件或者数据),并通过定义VPATH路径由框架直接读取使用,而无需通过flag传参的方式来使用。
   计数器Counter
     计数器counter是框架提供给用户对自身应用程序(Mapper/Reducer)进行统计的工具。通过计数器,可以对map或者reduce阶段进行各种统计并及时在monitor页面上进行反馈,帮助用户了解自身应用的情况。
    多种类型数据转换和比较支持
    ComparableStringEncoder/Decoder 用来完成各种类型数据和可比较字符串的相互转换,目的是对各种类型的数据进行比较。支持基本类型和复杂类型。
    全局Report机制
    MapReduce框架提供Report机制,将框架中类似启动Master失败,Split失败等信息集中展示到Controller端,帮助用户定位问题,同时用户还可以利用Report展示用户程序中的相关问题。
    基于BreakPad的远程调试
     用户提交的mapreduce在torca上运行时出现core时,用户很难调试。为此,MapReduce将breakpad集成进mapreduce框架, 当用户的程序core之后,框架利用breakpad提取mapreduce程序core时候的backtrace,并将他发送mapreduce master程序, 展示在monitor上,方便用户去调试定位问题。
    一致性检测
    框架提供mapreduce_consistency_check flag(默认值为false),开启后每一次MapReduce运算会得到一个md5校验码。用户通过对比运行两轮MapReduce job得到校验码来确定自己的代码中是否存在随机性。框架也可以用此功能进 行数据正确性校验。
    输入路径过滤Path Filter
    框架支持指定模式的文件作为输入,默认提供正则表达式匹配方法。用户也可以通过实现PathFilter类自定义文 件匹配模式,从而对输入文件进行过滤。
    5 框架应用现状
    目前台风MapReduce已经在很多业务中得到应用,包括:游戏用户数据分析,面向搜索广告(AFS)和面向内容广告(AFC)系统的日志分析、数据挖掘等模块,网页搜索的相关性计算和反作弊等众多业务。随着系统的成熟,业务作业运行的数量和集群规模也在不断增加。


    其中,单次作业处理数据量最大的是搜搜快照全量索引任务,处理快照总数据量超过1PB,在490个worker的情况下耗时约35个小时。其次是全网数据的锚文本反转任务,数据总量500TB, 处理父页面数为283亿,子页面数为2万亿,在900个Worker情况下耗时约为25小时。
    同时,我们采用Sort Benchmark中Terasort和MinuteSort作为测试框架性能的标准之一,并不断优化框架挑战性能极限。我们在5月份成功在943台服务器的集群上启动1800个Worker,耗时53s完成1T数据的排序。按照大赛记录,我们当时创造MinuteSort和Terasort两项世界记录,详细介绍见" Typhoon云计算之MapReduce: 挑战Google,yahoo的排序速度"。
    此外,由于台风MapReduce以程序库的形式存在,因此利用它可以方便地构建其它应用系统。例如目前被广泛应用的结构化日志处理系统(Dagger)就是基于台风MapReduce框架而构建的。
    6 总结
    本文主要介绍MapReduce计算框架的设计思想和特点,并简单介绍了各个模块的功能,最后介绍各种应用案例和应用情况。