旧游无处不堪寻
无寻处,惟有少年心
『数据密集型应用系统设计』读书笔记(十)

Web 和越来越多基于 HTTP/REST 的 API 使得请求/响应的交互模式变得普遍,但是这并不是系统的唯一类型。存在三种不同类型的系统:

  1. 在线服务(在线系统): 服务等待客户请求或指令的到达,当收到请求或指令时,服务试图尽可能快地处理它,并发回一个响应。响应时间通常是服务性能的主要衡量指标
  2. 批处理系统(离线系统): 批处理系统接收大量的输入数据,运行一个作业来处理数据,并产生输出数据。作业往往需要执行一段时间(从几分钟到几天),所以用户通常不会等待作业完成。批处理作业的主要性能衡量标准通常是吞吐量
  3. 流处理系统(近实时系统): 流处理介于在线与离线/批处理之间。与批处理系统类似,流处理系统处理输入并产生输出(而不是响应请求)。但是,流式作业在事件发生后不久即可对事件进行处理,而批处理作业则使用固定的一组输入数据进行操作。

批处理是构建可靠、可扩展与可维护应用的重要组成部分。例如著名的批处理算法 MapReduce 使得 Google 具有大规
模可扩展性能力。该算法随后在各种开源数据系统中被陆续实现,例如 Hadoop。

事实上,批处理是一种非常古老的计算形式。本章将介绍 MapReduce 和其他一些批处理算法和框架,并探讨它们在现代数据系统中的应用。

使用 Unix 工具进行批处理


假设有一个 Web 服务器,每次响应请求时都会在日志文件中追加一行记录。示例如下:

216.58.210.78 - - [27/Feb/2015:17:55:11 +oooo] "GET /css/typography.css HTTP/1.1"
200 3377 "http://martin.kleppmann.com/" "Mozilla/s.o (Macintosh; Intel Mac OS X
10 9 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115
Safari/537.36"

解日志格式的定义如下:

$remote_addr - $remote_user [$time_local] "$request"
$status $body_bytes_sent "$http_referer" "$http_user_agent"

简单日志分析

可以在 Unix shell 中执行下列操作:

cat /var/log/nginx/access. log
awk '{print $7}'
sort
uniq -c
sort -r -n
head -n 5

像 bash 这样的 Unix shell 可以让我们轻松地将这些小程序组合成强大的数据处理作业。

MapReduce 与分布式文件系统


MapReduce 有点像分布在数千台机器上的 Unix 工具。和大多数 Unix 工具一样,运行 MapReduce 作业通常不会修改输入,除了生成输出外没有任何副作用。

Unix 工具使用 stdin 和 stdout 作为输入和输出,而 MapReduce 作业在分布式文件系统上读写文件。在 Hadoop 的MapReduce 实现中,该文件系统被称为 HDFS(Hadoop Distributed File System, 一个 Google 文件系统(GFS)的开源实现版本)。

HDFS 基于无共享原则。共享磁盘存储由集中式存储设备实现,而无共享方法不需要特殊硬件,只需要通过传统数据中心网络连接的计算机。
HDFS 会在每台机器上运行守护进程,开放一个网络服务以允许其他节点访问存储在该机器上的文件。名为 NameNode 的中央服务器会跟踪哪个文件块存储在哪台机器上。
HDFS 具有很好的扩展性,最大的 HDFS 集群运行在上万台机器上。

MapReduce 作业执行

MapReduce 是一个编程框架,可以使用它编写代码来处理 HDFS 等分布式文件系统中的大型数据集。
最简单的理解方法是参考本章前面的”简单日志分析”中的 Web 日志的示例:

  1. 读取一组输入文件,并将其分解成记录。上述示例中,每个记录都是日志中的一行
  2. 调用 mapper 函数从每个输入记录中提取一个键值对。前面的例子中,mapper 函数是 awk ‘{print $7}’ 它提取 URL ($7) 作为关键字,并将相应的值留为空
  3. 按关键字将所有的键值对排序。在日志示例中,这由第一个 sort 命令完成
  4. 调用 reducer 函数遍历排序后的键值对。如果同一个键出现多次,排序会使它们在列表中相邻,所以很容易组合这些值, 而不必在内存中保留过多状态。在前面的例子中,reducer 是由 uniq -c 命令实现的

这四个步骤可以由一个 MapReduce 作业执行。步骤 2 (map)和 4 (reduce)是用户编写自定义数据处理的代码。步骤 1 (将文件分解成记录)由输入格式解析器处理。步骤 3 中的排序步骤 sort 隐含在 MapReduce 中,无需用户编写,mapper 的输出始终会在排序之后再传递给 reducer。

要创建 MapReduce 作业, 需要实现两个回调函数,即 mapperreducer,其行为如下:

  • Mapper: 每个输入记录都会调用一次 mapper 程序,其任务是从输入记录中提取关键字和值。对于每个输入,它可以生成任意数量的键值对
  • Reducer: MapReduce 框架使用由 mapper 生成的键值对,收集属于同一个关键字的所有值,并使用迭代器调用 reducer 以使用该值的集合

MapReduce 与 Unix 命令管道的主要区别在于它可以跨多台机器并行执行计算,而不必编写代码来指示如何并行化

在分布式计算中可以使用标准的 Unix 工具作为 mapper 和 reducer,但更为常见的是用传统编程语言实现的函数。在 Hadoop MapReduce 中,mapper 和 reducer 都是实现特定接口的 Java 类

MapReduce 工作流

单个 MapReduce 作业可以解决的问题范围有限。将 MapReduce 作业链接到工作流中是非常普遍的,这样,作业的输出将成为下一个作业的输入。
链接方式的 MapReduce 作业并不像 Unix 命令流水线,而更像是一系列命令,其中每个命令的输出被写入临时文件,下一个命令从临时文件中读取

只有当作业成功完成时,批处理作业的输出才会被视为有效。MapReduce 会丢弃失败作业的部分输出。因此,工作流中的一个作业只有在先前的作业(即生成其输入目录的作业)成功完成时才能开始。

为了处理这些作业执行之间的依赖关系,已经开发了各种 Hadoop 的工作流调度器,例如 Oozie、Azkaban、Luigi、Airflow 和 Pinball。良好的工具支持对于管理如此复杂的数据流非常重要。

Hadoop 的各种高级工具如 Pig、Hive,支持设置多个 MapReduce 阶段的工作流,这些不同的阶段会被自动链接起来。

批处理工作流的输出

批处理即不是事务处理,也不是分析,批处理过程的输出通常不是报告,而是其他类型的数据结构。

  1. 生成搜索索引: Google 最初使用 MapReduce 的目的是为其搜索引擎建立索引
  2. 批处理输出键值: 批处理的另一个常见用途是构建机器学习系统如分类器、推荐系统,这些批量作业的输出通常是某种数据库

Hadoop 经常被用于实现 ETL 过程,来自事务处理系统的数据以某种原始形式转储到分布式文件系统中,然后编写 MapReduce 作业进行数据清理,将其转换为关系表单,并将其导入数据仓库以进行分析。

超越 MapReduce


使用原始的 MapReduce API 来实现一个复杂的处理任务是相当困难和费力的,因此创建了各种高级编程模型如 Pig、Hive 等。然而,MapReduce 执行模型本身也存在一些问题,例如性能,中间状态实体化等。为了解决 MapReduce 的这些问题,出现了一些分布式批处理的新的执行引擎,其中最著名的是 Spark、Flink。它们的设计方式有很多不同之处,但有一个共同点: 它们把整个工作流作为一个作业来处理,而不是把它分解成独立的子作业

高级 API 和语言

自 MapReduce 流行以来,分布式批处理的执行引擎已经逐渐成熟。如前所述,由于手工编写 MapReduce 作业太过耗时费力,因此 Hive、Pig 等高级语言和 API 变得非常流行。Spark 和 Flink 也包含他们自己的高级数据流 API。