Spark学习(四)RDD自定义分区和缓存

coding

一,简介

二,自定义分区规则

  2.1 普通的分组TopN实现

  2.2 自定义分区规则TopN实现

三,RDD的缓存

  3.1 RDD缓存简介

  3.2 RDD缓存方式

正文

一,简介

  在之前的文章中,我们知道RDD的有一个特征:就是一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。这个分配的规则我们是可以自己定制的。同时我们一直在讨论Spark快,快的方式有那些方面可以体现,RDD缓存就是其中的一个形式,这里将对这两者进行介绍。

二,自定义分区规则

  分组求TopN的方式有多种,这里进行简单的几种。这里尊卑一些数据:点击下载

  2.1 普通的分组TopN实现

  实现思路一:先对数据进行处理,然后聚合。最后进行分组排序。

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNone {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/", 2)

// 对每一行数据进行整理

val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {

val words: Array[String] = line.split("/")

val teacher: String = words(3)

val subject: String = words(2).split("[.]")(0)

((subject, teacher), 1)

})

// 聚合,将学科和老师联合当做key

val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)

//分组排序(按学科进行分组)

//[学科,该学科对应的老师的数据]

val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)

// 这里取出的是每一组的数据

// 为什么可以调用scala的sortby方法呢?因为一个学科的数据已经在一台机器上的一个scala集合里面了

// 弊端,调用scala的sortBy当数据量过大时,有内存溢出的缺陷

val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(4))

println(result.collect.toBuffer)

}

}

  实现思路二:先对数据进行处理,然后聚合,然后对数据进行单学科过滤,最后进行排序,提交

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")

val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {

val words: Array[String] = line.split("/")

val teacher: String = words(3)

val subject: String = words(2).split("[.]")(0)

((subject, teacher), 1)

})

val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)

// 获取所有学科

val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()

// 对所有的reduce后的数据进行单学科过滤,在进行排序

for(sb <- subjects){

val filter: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb)

// 这里进行了多次提交

val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)

print(result.toBuffer)

}

sc.stop()

}

}

  2.2 自定义分区规则TopN实现

  实现方式一:先对数据进行处理,然后聚合,而后对按照学科进行分区,然后对每一个分区进行排序

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNthree {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")

val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {

val words: Array[String] = line.split("/")

val teacher: String = words(3)

val subject: String = words(2).split("[.]")(0)

((subject, teacher), 1)

})

//聚合,将学科和老师联合当做key ---> 这里有一次shuffle

val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)

//计算有多少学科

val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()

//partitionBy按照指定的分区规则进行分区

//调用partitionBy时RDD的Key是(String, String) --->这里也有一次shuffle

val partioned: RDD[((String, String), Int)] = reduced.partitionBy(new SubPartitioner(subjects))

//如果一次拿出一个分区(可以操作一个分区中的数据了)

val sorted: RDD[((String, String), Int)] = partioned.mapPartitions(it => {

//将迭代器转换成list,然后排序,在转换成迭代器返回

it.toList.sortBy(_._2).reverse.take(3).iterator

})

val result: Array[((String, String), Int)] = sorted.collect()

print(result.toBuffer)

}

}

// 自定义分区规则,需要继承Partitioner

class SubPartitioner(subs: Array[String]) extends Partitioner{

//相当于主构造器(new的时候回执行一次)

//用于存放规则的一个map

private val rules = new mutable.HashMap[String, Int]()

var i = 0

for(sub <- subs){

rules.put(sub, i)

i += 1

}

//返回分区的数量(下一个RDD有多少分区)

override def numPartitions: Int = subs.length

//根据传入的key计算分区标号

//key是一个元组(String, String)

override def getPartition(key: Any): Int = {

//获取学科名称

val s: String = key.asInstanceOf[(String, String)]._1

//根据规则计算分区编号

rules(s)

}

}

  实现方式二:上面的过程可以将聚合和分区操作进行合并,减少shuffle的次数

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNfour {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")

val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {

val words: Array[String] = line.split("/")

val teacher: String = words(3)

val subject: String = words(2).split("[.]")(0)

((subject, teacher), 1)

})

val subjects: Array[String] = sbToTeacherOne.map(_._1._1).distinct().collect()

// 在这里传入分区规则,即聚合时就分区

val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(new SubPartinerTwo(subjects), _+_)

// 对每个分区进行排序

val result: RDD[((String, String), Int)] = reduced.mapPartitions(it => {

it.toList.sortBy(_._2).reverse.take(3).iterator

})

print(result.collect().toBuffer)

}

}

class SubPartinerTwo(subs: Array[String]) extends Partitioner{

private val rules = new mutable.HashMap[String, Int]()

var i = 0

for(sub <- subs){

rules.put(sub, i)

i += 1

}

override def numPartitions: Int = subs.length

override def getPartition(key: Any): Int = {

val subject: String = key.asInstanceOf[(String, String)]._1

rules(subject)

}

}

三,RDD的缓存

  3.1 RDD缓存简介

  Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

  3.2 RDD缓存方式

  RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

  

  通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

  

  缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

  实例:

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")

val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {

val words: Array[String] = line.split("/")

val teacher: String = words(3)

val subject: String = words(2).split("[.]")(0)

((subject, teacher), 1)

})

val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)

// 这里讲reduced的数据集到缓存中

val cached: RDD[((String, String), Int)] = cached.cache()

// 获取所有学科

val subjects: Array[String] = cached.map(_._1._1).distinct().collect()

// 对所有的reduce后的数据进行单学科过滤,在进行排序

for(sb <- subjects){

// 因为这里的多次提交和过滤,所以添加到缓存就有必要了

val filter: RDD[((String, String), Int)] = cached.filter(_._1._1 == sb)

// 这里进行了多次提交

val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)

print(result.toBuffer)

}

sc.stop()

}

}

以上是 Spark学习(四)RDD自定义分区和缓存 的全部内容, 来源链接: utcz.com/z/509728.html

回到顶部