RDD的详解、创建及其操作

database

RDD的详解


RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!

RDD的创建

RDD中的数据可以来源于2个地方:本地集合或外部数据源

RDD操作

分类

转换算子

Map

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo03Map {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()

conf.setAppName("Demo03Map").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

//读取文件数据

val linesRDD: RDD[String] = sc.textFile("spark/data/words.txt")

//对数据进行扁平化处理

val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))

//按照单词分组

val groupRDD: RDD[(String, Iterable[String])] = flatRDD.groupBy(w => w)

//聚合

val wordsRDD: RDD[String] = groupRDD.map(kv => {

val key: String = kv._1

val words: Iterable[String] = kv._2

key + "," + words.size

})

//分组+聚合

val mapRDD1: RDD[(String, Int)] = flatRDD.map((_, 1))

val words1: RDD[(String, Int)] = mapRDD1.reduceByKey(_ + _)

////分组+聚合

val mapRDD2: RDD[(String, Int)] = flatRDD.map((_, 1))

val words2: RDD[(String, Iterable[Int])] = mapRDD2.groupByKey()

val wordSum: RDD[(String, Int)] = words2.mapValues(_.size)

wordSum.foreach(println)

//输出

wordsRDD.foreach(println)

words1.foreach(println)

}

}

flatMap(数据扁平化处理)

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo04FlatMap {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo04FlatMap").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val linesRDD: RDD[String] = sc.parallelize(List("java,scala,python", "map,java,scala"))

//扁平化处理

val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))

flatRDD.foreach(println)

}

}

Mappartitions

map和mapPartitions区别

1)map:每次处理一条数据

2)mapPartitions:每次处理一个分区数据

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo05MapPartition {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val stuRDD: RDD[String] = sc.textFile("spark/data/words.txt",3)

stuRDD.mapPartitions(rdd => {

println("map partition")

// 按分区去处理数据

rdd.map(line => line.split(",")(1))

}).foreach(println)

}

}

fliter 过滤

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo06Filter {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val linesRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

//过滤,转换算子

linesRDD.filter(kv => {

kv % 2 == 1

}).foreach(println)

}

}

sample 取样

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo07Sample {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

/**

* sample:对数据取样

* withReplacement 有无放回

* fraction 抽样比例

* withReplacement:表示抽出样本后是否在放回去,true表示会放回去

* 这也就意味着抽出的样本可能有重复

* fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30%

*/

val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt",3)

stuRDD.sample(withReplacement = true,0.1).foreach(println)

}

}

union 将相同结结构的数据连接到一起

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo08Union {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

/**union

* 将两个相同结构的数据连接在一起

*/

val lineRDD1: RDD[String] = sc.parallelize(List("java,scala", "data,python"))

val lineRDD2: RDD[String] = sc.parallelize(List("spark,scala", "java,python"))

println(lineRDD1.getNumPartitions)

val unionRDD: RDD[String] = lineRDD1.union(lineRDD2)

println(unionRDD.getNumPartitions)

unionRDD.foreach(println)

}

}

mappatitionWIthindex

    //mapPartitionsWithIndex也是一个转换算子

// 会在处理每一个分区的时候获得一个index

//可以选择的执行的分区

stuRDD.mapPartitionsWithIndex((index, rdd) => {

println("当前遍历的分区:" + index)

// 按分区去处理数据

rdd.map(line => line.split(",")(1))

}).foreach(println)

join 将数据按照相同key进行关联(数据必须是(K,V))

import java.io

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo09Join {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

// 构建K-V格式的RDD

val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "张三"), "002" -> "小红", "003" -> "小明"))

val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21))

val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))

//将文件进行join

val joinRDD: RDD[(String, (String, Int))] = tuple2RDD1.join(tuple2RDD2)

joinRDD.map(kv => {

val i: String = kv._1

val j: String = kv._2._1

val k: Int = kv._2._2

i + "," + j + "," + k

}).foreach(println)

//第二种方式

joinRDD.map {

case (id: String, (name: String, age: Int)) => id + "*" + name + "*" + age

}.foreach(println)

val leftJoinRDD: RDD[(String, (String, Option[String]))] = tuple2RDD1.leftOuterJoin(tuple2RDD3)

leftJoinRDD.map {

//存在关联

case (id: String, (name: String, Some(gender))) =>

id + "*" + name + "*" + gender

//不存在关联

case (id: String, (name: String, None)) =>

id + "*" + name + "*" + "_"

}

}

}

groupByKey 将kv格式的数据进行key的聚合

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo10GroupByKey {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo10GroupByKey").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

/**

* groupBy 指定分组的字段进行分组

*/

// 统计班级人数

val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")

linesRDD.groupBy(word => word.split(",")(4))

.map(kv => {

val key = kv._1

val wordsCnt = kv._2.size

key + "," + wordsCnt

}).foreach(println)

val linesMap: RDD[(String, String)] = linesRDD.map(lines => (lines.split(",")(4), lines))

//按照key进行分组

linesMap.groupByKey()

.map(lines=>{

val key = lines._1

val wordsCnt: Int = lines._2.size

key+","+wordsCnt

}).foreach(println)

}

}

ReduceByKey

reduceByKey 需要接收一个聚合函数

首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作)

相当于 MR 中的combiner

可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率

相对于groupByKey来说,效率更高,但功能更弱

幂等操作

y = f(x) = f(y) = f(f(x))

reducebyKey与groupbykey的区别

reduceByKey:具有预聚合操作

groupByKey:没有预聚合

在不影响业务逻辑的前提下,优先采用reduceByKey。

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo11ReduceByKey {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo11ReduceByKey").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")

//统计班级人数

linesRDD.map(lines => (lines.split(",")(4), lines))

.groupByKey()

.map(kv => {

val key = kv._1

val cnt = kv._2.size

key + "" + cnt

}).foreach(println)

//ReduceByKey

/**

* reduceByKey 需要接收一个聚合函数

* 首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作)

* 相当于 MR 中的combiner

* 可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率

* 相对于groupByKey来说,效率更高,但功能更弱

* 幂等操作

* y = f(x) = f(y) = f(f(x))

*/

linesRDD.map(lines=>(lines.split(",")(4),1))

.reduceByKey(_+_)

.foreach(println)

}

}

sort 排序,默认升序

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo12Sort {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Demo12Sort").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")

/**

* sortBy 转换算子

* 指定按什么排序 默认升序

*

* sortByKey 转换算子

* 需要作用在KV格式的RDD上,直接按key排序 默认升序

*/

linesRDD.sortBy(lines => lines.split(",")(2), ascending = false) //按照年纪降序

.take(10) //转换算子打印十行

.foreach(println)

val mapRDD: RDD[(String, String)] = linesRDD.map(l => (l.split(",")(2), l))

mapRDD.sortByKey(ascending = false)

.take(10)

.foreach(println)

}

}

Mapvalue

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo13MapValue {

def main(args: Array[String]): Unit = {

/**

* mapValues 转换算子

* 需要作用在K—V格式的RDD上

* 传入一个函数f

* 将RDD的每一条数据的value传给函数f,key保持不变

* 数据规模也不会改变

*/

val conf: SparkConf = new SparkConf().setAppName("Demo13MapValue").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val linesRDD: RDD[(String, Int)] = sc.parallelize(List(("zs", 10), ("zzw", 34), ("lm", 18)))

linesRDD.mapValues(lines=>lines*2)

.foreach(println)

}

行为算子

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo14Action {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("****").setMaster("local")

val sc: SparkContext = new SparkContext(conf)

val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")

linesRDD.take(10)// take 取出前n条数据 相当于limit

.foreach(println) //这里的foreach不是行为算子,是take里面的方法

// count

// 返回RDD的数据量的多少

println(linesRDD.count())

// collect

// 将RDD转换为Scala中的Array

// 注意数据量的大小 容易OOM

val collectRDD: Array[String] = linesRDD.collect()

collectRDD.take(10)

.foreach(println)

// reduce 全局聚合

// select sum(age) from student group by 1

val i = linesRDD.map(lines => lines.split(",")(2).toInt)

.reduce(_ + _)

println(i)

//save

linesRDD.sample(withReplacement = false,0.2)

.saveAsTextFile("spark/data/save")

}

}

以上是 RDD的详解、创建及其操作 的全部内容, 来源链接: utcz.com/z/536069.html

回到顶部