前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系


目录

Spark RDD 论文详解(一)摘要和介绍

Spark RDD 论文详解(二)RDDs

Spark RDD 论文详解(三)Spark 编程接口

Spark RDD 论文详解(四)表达 RDDs

Spark RDD 论文详解(五)实现

Spark RDD 论文详解(六)评估

Spark RDD 论文详解(七)讨论

Spark RDD 论文详解(八)相关工作和结尾


思维导图

在这里插入图片描述

正文

3、Spark 编程接口

原文翻译

Spark 使用 Scala 语言实现了抽象的 RDD,Scala 是建立在 java VM 上的静态类型函数式编程语言。

我们选择 Scala 是因为它结合了简洁(很方便进行交互式使用)与高效(由于它的静态类型)

然而,并不是说 RDD 的抽象需要函数式语言来实现。

开发员需要写 Driver 程序来使用 Spark,就像上面图二展示的,这个 Driver 程序连接了集群中的 Workers。

Driver 端程序定义了一系列的 RDDs 并且调用了 RDD 的 action 操作。

Driver 端的 Spark 程序同时也会跟踪 RDDs 之间的的血缘关系。

Workers 是可以将 RDD 分区数据存储在内存中的长期存活的进程。

在 2.2.1 小节中的日志挖掘例子中,我们提到,用户提供给 RDD 操作比如 map 以参数作为这个操作的闭包(说白了就是函数)。

Scala 将这些函数看作一个 java 对象,这些对象是可以序列化的,并且可以通过网络传输传输到其他的机器节点上的。

Scala 将函数中的变量看作一个对象中的变量。

比如,我们可以写一段这样的代码:

var x = 5; rdd.map(_ + x)

来达到给这个 RDD 每一个元素加上 5 的目的。

RDDs 是被一元素类型参数化的静态类型对象,比如,RDD[Int] 表示一个类型为整数的 RDD。

然而,我们很多例子中的 RDD 都会省去这个类型,这个是因为 Scala 支持类型推断

虽然我们用 Scala 实现 RDD 的方法很简单,但是我们需要处理用反射实现的闭包对象相关的工作,我们还需要做很多的工作使得 Spark 可以用 Scala 的解释器,这个我们在 5.2 小节中会讨论到。

尽管如此,我们是不需要修改 Scala 的编译器的。


3.1 Spark 中 RDD 的操作

原文翻译

表二中列举了 Spark 中 RDD 常用的 transformationsactions 操作,且描述了每一个方法的签名以及类型。

我们需要记住 transformations 是用来定义一个新的 RDD 的 lazy 操作,而 actions 是真正触发一个能返回结果或者将结果写到文件系统中的计算。

需要注意的是,一些操作比如 join 只适合用于 key-value 类型的 RDDs。

我们取的函数的名称和 scala 或者其他函数式编程语言的函数名是一致的。

比如,map 是一个 one-to-one 的映射操作,而 flatMap 的每一个输入值会对应一个或者更多的输出值(有点像 MapReduce 中的 map)

除了这些操作,用户可以通过 persist 操作来请求缓存 RDD。

另外,用户可以获取 RDD 的分区顺序,这个命令由 Partitioner 类表示,并且可以根据它来对另一个数据集进行分区。

像 groupByKey、reduceByKey 以及 sort 等操作都是经过了 hash 或者 range 分区后的 RDD。
在这里插入图片描述

表二:Spark 中 RDD 常用的 transformations 和 actions 操作。Seq[T] 表示元素类型为 T 的一个序列。


3.2 举例应用

我们用两个迭代式的应用:线性回归和 PageRank 来补充 2.2.1 提到的数据挖掘的例子。

稍后也会展示下如何控制 RDD 的分区以达到提升性能的目的。

3.2.1 线性回归

原文翻译

很多的机器学习算法一般都是迭代式的计算,因为它们需要跑迭代的优化程序(比如梯度下降)来达到最大化功能。

他们将数据存放在内存中以达到很快的速度。

作为一个例子,下面的程序实现了线性回归,一个能找到最佳区分两种点集(垃圾邮件以及非垃圾邮件)的超平面 w 的常用的分类算法。

这个算法用了梯度下降的方法:一个随机的值作为 w 的初始值,每次迭代都会将含有 w 的方法应用到每一个数据点然后累加得到梯度值,然后将 w 往改善结果的方向移动。

val points = spark.textFile(...).map.(parsePoint).persist()

var w = // 随机的初始向量

for (i <- 1 to ITERATIONS) {
 val gradient = points.map{ p => p.x * (1/(1+exp (-p.y*(w dot p.x)))-1) * p.y }.reduce((a, b)=> a+b) 
  w -= gradient
}

一开始我们定义一个叫 points 的 RDD,这个 RDD 从一个文本文件中经过 map 将每一行转换为 Point 对象。

然后我们重复对 points 进行 map 和 reduce 操作计算出每一步的梯度值。

在迭代之间我们将 points 存放在内存中可以使得性能提高 20 倍,我们将会在 6.1 节中讨论。


线性回归简介

线性回归( Linear Regression )是利用线性回归方程(函数)对一个或多个自变量和因变量之间关系进行建模的一种回归分析方法。

线性回归问题属于监督学习范畴,要求训练数据集中的数据具有明确的目标。

若回归分析中只包含一个自变量和一个因变量,且二者的关系可用一条直线近似表示,那么这种回归分析被称为一元线性回归分析,或一元线性拟合。

如果回归分析中包含两个或两个以上的自变量,且因变量和自变量之间是线性关系,则称为多元线性回归分析。

通常初学者对于线性回归较难理解,若换个角度来解释,其实线性回归问题就是线性拟合问题。

以一元线性回归为例,就是研究如何确定一条直线来近似地表示描述训练集中所有数据的空间分布

类似地,对于二元回归问题,采用一个平面来拟合,若多元回归问题,采用超平面来拟合

线性拟合中用于拟合的直线、平面、超平面都对应着线性方程,因此线性回归问题转换为求解相应线性方程(预测函数)的问题。

用户可以通过求解出的线性方程对数据的目标值进行预测。

实践

参考我的博客——使用 Spark MLlib 实现线性回归


3.2.2 PageRank

原文翻译

在 PageRank 中数据共享更加复杂。

如果一个文档引用另一个文档,那被引用的文档的排名值(rank)需要加上引用的文档发送过来的贡献值,当然这个过程是个迭代的过程。

在每一次迭代中,每一个文档都会发送 r/n 的贡献值给它的邻居,其中 r 表示这个文档的排名值,n 表示这个文档的邻居数量。

然后更新文档的排名值为,这个表达式值表示这个文档收到的贡献值,N 表示所有的文档的数量,我们可以用如下的 Spark 代码来表达 PageRank:

// 加载图作为  (URL, outlinks) 对的RDD
val links = spark.textFile(...).map(...).persist()
    var ranks = // (URL, rank) 对的RDD
    for (i<- 1 to ITERATIONS) {
      // 构建 (targetURL, float) 对的RDD
      // 伴随着每个页面的贡献
      val contribs = links.join(ranks).flatMap {
        (ur1, (links, rank))
        => links.map(dest => (dest, rank / links.size))
      }
      // 按 URL 汇总贡献并获得新排名
      ranks = contribs.reduceByKey((x, y) => x + y).mapValues(sum => a / N + (1 - a) * sum)
    }

其中 links 表示(URL , outlinks)键值对。

这个程序的 RDD 的血缘关系图如图三。

在每一次迭代中我们都是根据上一次迭代的 contribs 和 ranks 以及原始不变的 links 数据集来创建一个新的 ranks 数据集。

随着迭代次数的变多这张图会变的越长,这个是这个图比较有意思的特点。

如果这个 job 的迭代次数很多的话,那么备份一些版本的 ranks 来达到减少从错误中恢复出来的时间是很有必要的,用户可以调用标记为 RELIABLE 的 persist 函数来达到这个目的。

需要注意的是,links 是不需要备份的,因为它的分区数据可以快速的从重新计算输入文件中对应的数据块而得到,这个数据集一般会比 ranks 数据集大上很多倍,因为每一个文档会有很多的连接但只会有一个排名值,所以利用 RDD 的血缘关系来恢复数据肯定比 checkpoint 内存中的数据快很多(因为数据量太大)。

最后,我们可以控制 RDDs 的分区方式来优化 PageRank 中的节点通讯。

如果我们事先为 links 指定一个分区方式(比如,根据 link 的 url 来 hash 分区,就是将相同的 url 发送到同一个节点中),然后我们对 ranks 进行相同的分区方式,这样就可以保证 links 和 ranks 之间的 join 不需要机器节点之间的通讯(因为相同的 url 都在同一个机器节点了,那么相对应的 rank 和 link 肯定也是在同一个机器节点了)。

我们也可以自定义分区器来实现将一组页面 url 放到一起(比如按照 url 的 domain 进行分区)。

以上两种优化方式都可以通过在定义 links 的时候调用 partitionBy 来实现:

links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist()

在调用了 partitionBy 后,links 和 ranks 之间的 join 操作会自动的在 link 所在的机器进行每一个 URL 的贡献值的聚合计算,然后在相同的机器计算新的排名值,然后计算出来的新的 ranks 在相同的机器和 links 进行 join。

这种在迭代之间进行数据一致分区是像 Pregel 这种框架中的主要的优化计算方式。

RDDs 使得用户可以直接自己来实现这种优化机制。


PageRank 算法简介

PageRank 算法即网页排名算法,是 Google 创始人拉里佩奇和谢尔盖・布林于 1997 年构建早期的搜索系统原型时提出的链接分析算法。

自从 Google 在商业上获得巨大成功后,该算法引起了研究者们的广泛关注,目前很多重要的链接分析算法都是在 PageRank 算法基础上衍生出来的。

PageRank 算法是 Google 用来标识网页等级的重要依据,是 Google 衡量一个网站的好坏的唯一标准。

对网页进行排名需要量化的依据,因此 PageRank 算法对每一个网页进行计算后会得到一个在 0 到 10 范围内的值,即该网页的 PageRank 值,简称 PR 值。

PR 值越高说明网页越受欢迎,越重要。

PageRank算法的核心步骤如下:

  • 第 1 步,初始化

PageRank 算法基于两个假设:数量假设和质量假设。

首先通过链接关系构建 Web 图(网络中每页面对应 Web 图中的一个顶点,若网页 A 中包含一条指向 B 的链接,则 Web 图中存在一条由顶点 A 指向顶点 B 的边);

然后为 Web 图中的每个顶点设置初始的 PR 值(通常设定每个顶点的初始 PR 值为 1 / N ,其中 N 为网络中网页的个数)。

  • 第 2 步,迭代计算。

首先假设一个用户在访问某网页时,其将跳转到该网页上各超链接页面的概率相同。

例如网页 A 链向网页 B 、 C 、 D ,所以根据假设,用户从 A 跳转到 B 、 C 、 D 的概率各为 1 / 3 。

PageRank 算法在每一轮迭代计算的过程中,将每个网页当前的 PR 值平均分配到该网页指向的超链接页面上,这样每个网页便获得了相应的权值;

再将新权值求和,即得到该网页的新 PR 值。

当每个网页的 PR 值都获得更新后,就完成了一轮迭代。

  • 第 3 步,结束。

随着每一轮的迭代计算,网页的 PR 值会不断得到更新。

当迭代达到一定次数,或者每个网页的 PR 值固定不变,再或者 PR 值收敛至某一范围内时,算法停止。

算法停止时每个网页的 PR 值就是该网页最终的 PR 值。

实践

参考我的博客——使用 Spark GraphX 实现 PageRank 算法

上一篇 下一篇