Spark的運算元分類:

從大方向說,Spark運算元大致可以分為以下兩類:

(1)Transformation變換/轉換運算元:這種變換並不觸發提交作業,這種運算元是延遲執行的,也就是說從一個RDD轉換生成另一個RDD的轉換操作不是馬上執行,需要等到有Action操作的時候才會真正觸發。

(2)Action行動運算元:這類運算元會觸發SparkContext提交job作業,並將數據輸出到Spark系統。

從小方向說,Spark運算元大致可以分為以下三類:

(1)Value數據類型的Transformation運算元,這種變換並不觸發提交作業,針對處理的數據項是Value型的數據

(2)Key-Value 數據類型的Transformation運算元,這種變換並不觸發提交作業,針對處理的數據項是Key-Value型的數據對

(3)Action運算元:這類運算元會觸發SparkContext提交Job作業

(一)Value數據類型的Transformation運算元:

(1)輸入分區與輸出分區一對一型:

1.map運算元

2.flatMap運算元

3.mapPartitions運算元

4.mapPartitionsWithIndex運算元

(2)輸入分區與輸出分區多對一型

5.union運算元

6.cartesian運算元

(3)輸入分區與輸出分區多對多型

7.groupBy運算元、groupByKey運算元

(4)輸出分區是輸入分區子集類型

8.filter運算元

9.distinct運算元

10.subtract運算元

11.sample運算元

(5)Cache運算元

13.cache運算元

14.persist運算元

(二)Key-Value數據類型的Transformation運算元

(1)輸入分區與輸出分區一對一

15.mapValues運算元

(2)對單個RDD或者兩個RDD聚集

單個RDD聚集

16.combineByKey運算元

17.reduceByKey運算元

18.repartition運算元

兩個RDD聚集

19.cogroup運算元

(3)連接

20.join運算元

21.leftOutJoin和rightOutJoin運算元、fullOuterJoin運算元

(三)Action運算元

(1)無輸出

22.foreach運算元

(2)HDFS

23.saveAsTextFile運算元

24.saveAsObjectFile運算元

(3)Scala集合和數據類型

25.collect運算元

26.collectAsMap運算元

27.count,countByKey,CountByValue運算元

28.take、takeSample運算元

29.reduce運算元

30.aggregate運算元

31.zip、zipWithIndex運算元

Transformation:

1.map運算元

處理數據是一對一的關係,進入一條數據,出去的還是一條數據。map的輸入變換函數應用於RDD中所有的元素,而mapPartitions應用於所有分區。區別於mapPartitions主要在於調度粒度不同。如parallelize(1 to 10 ,3),map函數執行了10次,而mapPartitions函數執行了3次。

val infos: RDD[String] = sc.parallelize(Array[String]("hello spark","hello hdfs","hello HBase"))
val result: RDD[Array[String]] = infos.map(one => {
one.split(" ")
})
result.foreach(arr =>{arr.foreach(println)})

執行結果:

2.flatMap運算元

flatMap是一對多的關係,處理一條數據得到多條結果

將原來 RDD 中的每個元素通過函數 f 轉換為新的元素,並將生成的 RDD 的每個集合中的元素合併為一個集合。

val infos: RDD[String] = sc.makeRDD(Array[String]("hello spark","hello hdfs","hello MapReduce"))
val rdd1: RDD[String] = infos.flatMap(one => {
one.split(" ")
})
rdd1.foreach(println)

3.mapPartitions運算元

mapPartitions遍歷的是每一個分區中的數據,一個個分區的遍歷。獲 取 到 每 個 分 區 的 迭 代器,在 函 數 中 通 過 這 個 分 區 整 體 的 迭 代 器 對整 個 分 區 的 元 素 進 行 操 作,相對於map一條條處理數據,性能比較高,可獲取返回值。

可以通過函數f(iter) =>iter.filter(_>=3)對分區中所有的數據進行過濾,大於和等於3的數據保留,一個方塊代表一個RDD分區,含有1,2,3的分區過濾,只剩下元素3。

4.mapPartitionsWithIndex(function)運算元

拿到每個RDD中的分區,以及分區中的數據

val lines: RDD[String] = sc.textFile("./data/words",5)
val result: RDD[String] = lines.mapPartitionsWithIndex((index, iter) => {
val arr: ArrayBuffer[String] = ArrayBuffer[String]()
iter.foreach(one => {
// one.split(" ")
arr.append(s"partition = [$index] ,value = $one")
})
arr.iterator
}, true)
result.foreach(println)

5.union運算元

union合併兩個RDD,兩個RDD必須是同種類型,不一定是K,V格式的RDD

val rdd1: RDD[String] = sc.parallelize(List[String]("zhangsan","lisi","wangwu","maliu"),3)
val rdd2: RDD[String] = sc.parallelize(List[String]("a","b","c","d"),4)
val unionRDD: RDD[String] = rdd1.union(rdd2)
unionRDD.foreach(println)

6.cartesian運算元

求笛卡爾積,該操作不會執行shuffle操作,但最好別用,容易觸發OOM

7.groupBy運算元

按照指定的規則,將數據分組

val rdd: RDD[(String, Double)] = sc.parallelize(List[(String,Double)](("zhangsan",66.5),("lisi",33.2),("zhangsan",66.7),("lisi",33.4),("zhangsan",66.8),("wangwu",29.8)))
val result: RDD[(Boolean, Iterable[(String, Double)])] = rdd.groupBy(one => {
one._2 > 34
})
result.foreach(println)

groupByKey運算元

根據key去將相同的key對應的value合併在一起(K,V)=>(K,[V])

val rdd: RDD[(String, Double)] = sc.parallelize(List[(String,Double)](("zhangsan",66.5),("lisi",33.2),("zhangsan",66.7),("lisi",33.4),("zhangsan",66.8),("wangwu",29.8)))
val rdd1: RDD[(String, Iterable[Double])] = rdd.groupByKey()
rdd1.foreach(info=>{
val name: String = info._1
val value: Iterable[Double] = info._2
val list: List[Double] = info._2.toList
println("name = "+name+",value ="+list)
})

8.filter運算元

過濾數據,返回true的數據會被留下

val infos: RDD[String] = sc.makeRDD(List[String]("hehe","hahha","zhangsan","lisi","wangwu"))
val result: RDD[String] = infos.filter(one => {
!one.equals("zhangsan")
})
result.foreach(println)

9.distinct運算元

distinct去重,有shuffle產生,內部實際是map+reduceByKey+map實現

val infos: RDD[String] = sc.parallelize(List[String]("a","a","b","a","b","c","c","d"),4)
val result: RDD[String] = infos.distinct()
result.foreach(println)

10.subtract運算元

取RDD的差集,subtract兩個RDD的類型要一致,結果RDD的分區數與subtract運算元前面的RDD分區數多的一致。

val rdd1 = sc.parallelize(List[String]("zhangsan","lisi","wangwu"),5)
val rdd2 = sc.parallelize(List[String]("zhangsan","lisi","maliu"),4)
val subtractRDD: RDD[String] = rdd1.subtract(rdd2)
subtractRDD.foreach(println)
println("subtractRDD partition length = "+subtractRDD.getNumPartitions)

11.sample運算元

sample隨機抽樣,參數sample(withReplacement:有無放回抽樣,fraction:抽樣的比例,seed:用於指定的隨機數生成器的種子)

有種子和無種子的區別:

有種子是隻要針對數據源一樣,都是指定相同的參數,那麼每次抽樣到的數據都是一樣的

沒有種子是針對同一個數據源,每次抽樣都是隨機抽樣

(12.13)cache運算元、persist運算元

package core.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
* cache()和persist()注意問題
* 1.cache()和persist()持久化單位是partition,cache()和persist()是懶執行運算元,需要action運算元觸發執行
* 2.對一個RDD使用cache或者persist之後可以賦值給一個變數,下次直接使用這個變數就是使用持久化的數據。
* 也可以直接對RDD進行cache或者persist,不賦值給一個變數
* 3.如果採用第二種方法賦值給變數的話,後面不能緊跟action運算元
* 4.cache()和persist()的數據在當前application執行完成之後會自動清除
*/
object CacheAndPersist {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("cache")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("Error")
val lines: RDD[String] = sc.textFile("./data/persistData.txt")

// val linescache: RDD[String] = lines.persist(StorageLevel.MEMORY_ONLY)
val linescache: RDD[String] = lines.cache()

val startTime1: Long = System.currentTimeMillis()
val count1: Long = linescache.count()
val endTime1: Long = System.currentTimeMillis()
println("count1 = "+count1+". time = "+(endTime1-startTime1) + "mm")

val starttime2: Long = System.currentTimeMillis()
val count2: Long = linescache.count()
val endTime2: Long = System.currentTimeMillis()
println("count2 = "+count2+", time = "+(endTime2-starttime2) + "ms")
sc.stop()
}
}

14.mapValues運算元

針對K,V格式的數據,只對Value做操作,Key保持不變

val infos: RDD[(String, String)] = sc.makeRDD(
List[(String,String)](
("zhangsan","18"),("lisi","20"),("wangwu","30")
))
val result: RDD[(String, String)] = infos.mapValues(s => {
s + " " + "zhangsan18"
})
result.foreach(println)
sc.stop()

15.flatMapValues運算元

(K,V)->(K,V),作用在K,V格式的RDD上,對一個Key的一個Value返回多個Value

val infos: RDD[(String, String)] = sc.makeRDD(
List[(String,String)](
("zhangsan","18"),("lisi","20"),("wangwu","30")
))
val transInfo: RDD[(String, String)] = infos.mapValues(s => {
s + " " + "zhangsan18"
})
// transInfo.foreach(println)
val result: RDD[(String, String)] = transInfo.flatMapValues(s => {
//按空格切分
s.split(" ")
})
result.foreach(println)
sc.stop()

16.combineByKey運算元

首先給RDD中每個分區中的每一個key一個初始值

其次在RDD每個分區內部相同的key聚合一次

再次在RDD不同的分區之間將相同的key結果聚合一次

val rdd1: RDD[(String, Int)] = sc.makeRDD(List[(String, Int)](
("zhangsan", 10), ("zhangsan", 20), ("wangwu", 30),
("lisi", 40), ("zhangsan", 50), ("lisi", 60),
("wangwu", 70), ("wangwu", 80), ("lisi", 90)
),3)
rdd1.mapPartitionsWithIndex((index,iter)=>{
val arr: ArrayBuffer[(String, Int)] = ArrayBuffer[(String,Int)]()
iter.foreach(tp=>{
arr.append(tp)
println("rdd1 partition index ="+index+".value ="+tp)
})
arr.iterator
}).count()
println("++++++++++++++++++++++++++++++++++++")
val result: RDD[(String, String)] = rdd1.combineByKey(v=>{v+"hello"}, (s:String, v)=>{s+"@"+v}, (s1:String, s2:String)=>{s1+"#"+s2})
result.foreach(println)

17.reduceByKey運算元

首先會根據key去分組,然後在每一組中將value聚合,作用在KV格式的RDD上

首先會根據key去分組,然後在每一組中將value聚合,作用在KV格式的RDD上
val infos: RDD[(String, Int)] = sc.parallelize(
List[(String,Int)](("zhangsan",1),("zhangsan",2),
("zhangsan",3),("lisi",100),("lisi",200)),5
)
val result: RDD[(String, Int)] = infos.reduceByKey((v1, v2)=>{v1+v2})
result.foreach(println)
sc.stop()

18.repartition運算元

重新分區,可以將RDD的分區增多或者減少,會產生shuffle,coalesc(num,true) = repartition(num)

val rdd1: RDD[String] = sc.parallelize(List[String](
"love1", "love2", "love3", "love4",
"love5", "love6", "love7", "love8",
"love9", "love10", "love11", "love12"
), 3)
val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => {
val list: ListBuffer[String] = ListBuffer[String]()
iter.foreach(one => {
list.append(s"rdd1 partition = [$index] ,value = [$one]")
})
list.iterator
}, true)
val rdd3: RDD[String] = rdd2.repartition(3)
val rdd4: RDD[String] = rdd3.mapPartitionsWithIndex((index, iter) => {
val arr: ArrayBuffer[String] = ArrayBuffer[String]()
iter.foreach(one => {
arr.append(s"rdd3 partition = [$index] ,value = [$one]")
})
arr.iterator
})
val results: Array[String] = rdd4.collect()
results.foreach(println)
sc.stop()

19.cogroup運算元

合併兩個RDD,生成一個新的RDD。分區數與分區數多個那個RDD保持一致

val rdd1 = sc.parallelize(List[(String,String)](("zhangsan","female"),("zhangsan","female1"),("lisi","male"),("wangwu","female"),("maliu","male")),3)
val rdd2 = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",19),("lisi",190),("wangwu",20),("tianqi",21)),4)
val resultRDD: RDD[(String, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2)

resultRDD.foreach(info=>{
val key = info._1
val value1: List[String] = info._2._1.toList
val value2: List[Int] = info._2._2.toList
println("key ="+key+",value"+value1+", value2 = "+value2)
})
println("resultRDD partition length ="+resultRDD.getNumPartitions)
sc.stop()

20.join運算元

會產生shuffle,(K,V)格式的RDD和(K,V)格式的RDD按照相同的K,join得到(K,(V,W))格式的數據,分區數按照大的來。

val nameRDD: RDD[(String, String)] = sc.parallelize(List[(String,String)](("zhangsan","female"),("lisi","male"),("wangwu","female")),3)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",19),("wangwu",20)),2)
val joinRDD: RDD[(String, (String, Int))] = nameRDD.join(scoreRDD)
println(joinRDD.getNumPartitions)
joinRDD.foreach(println)

21.leftOutJoin、rightOutJoin運算元、fullOuterJoin運算元

leftOuterJoin(K,V)格式的RDD和(K,V)格式的RDD,使用leftOuterJoin結合,以左邊的RDD出現的key為主 ,得到(K,(V,Option(W)))

val nameRDD: RDD[(String, String)] = sc.parallelize(
List[(String,String)](("zhangsan","female"),
("lisi","male"),("wangwu","female"),("maliu","male")
))
val scoreRDD: RDD[(String, Int)] = sc.parallelize(
List[(String,Int)](("zhangsan",22),("lisi",19),
("wangwu",20),("tianqi",21)
))
val leftOutJoin: RDD[(String, (String, Option[Int]))] = nameRDD.leftOuterJoin(scoreRDD)
leftOutJoin.foreach(println)
sc.stop()

rightOuterJoin(K,V)格式的RDD和(K,W)格式的RDD使用rightOuterJoin結合以右邊的RDD出現的key為主,得到(K,(Option(V),W))

val nameRDD: RDD[(String, String)] = sc.parallelize(
List[(String,String)](("zhangsan","female"),("lisi","male")
,("wangwu","female"),("maliu","male")),3
)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(
List[(String,Int)](("zhangsan",18),("lisi",19),
("wangwu",20),("tianqi",21)),4
)
val rightOuterJoin: RDD[(String, (Option[String], Int))] = nameRDD.rightOuterJoin(scoreRDD)
rightOuterJoin.foreach(println)
println("rightOuterJoin RDD partiotion length = "+rightOuterJoin.getNumPartitions)
sc.stop()

fullOuterJoin運算元(K,,V)格式的RDD和(K,V)格式的RDD,使用fullOuterJoin結合是以兩邊的RDD出現的key為主,得到(K(Option(V),Option(W)))

val nameRDD: RDD[(String, String)] = sc.parallelize(List[(String,String)](("zhangsan","female"),("lisi","male"),("wangwu","female"),("maliu","male")),3)
val ageRDD: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",16),("wangwu",20),("tianqi",21)),4)
val fullOuterJoin: RDD[(String, (Option[String], Option[Int]))] = nameRDD.fullOuterJoin(ageRDD)
fullOuterJoin.foreach(println)
println("fullOuterJoin RDD partition length = "+fullOuterJoin.getNumPartitions)
sc.stop()

22.intersection運算元

intersection取兩個RDD的交集,兩個RDD的類型要一致,結果RDD的分區數要與兩個父RDD多的那個一致

val rdd1: RDD[String] = sc.parallelize(List[String]("zhangsan","lisi","wangwu"),5)
val rdd2: RDD[String] = sc.parallelize(List[String]("zhangsan","lisi","maliu"),4)
val result: RDD[String] = rdd1.intersection(rdd2)
result.foreach(println)
println("intersection partition length = "+ result.getNumPartitions)
sc.stop()

23.foreach運算元

foreach遍歷RDD中的每一個元素

val lines: RDD[String] = sc.textFile("./data/words") lines.foreach(println)

24.saveAsTextFile運算元

將DataSet中的元素以文本的形式寫入本地文件系統或者HDFS中,Spark將會對每個元素調用toString方法,將數據元素轉換成文本文件中的一行數據,若將文件保存在本地文件系統,那麼只會保存在executor所在機器的本地目錄

val infos: RDD[String] = sc.parallelize(List[String]("a","b","c","e","f","g"),4)

infos.saveAsTextFile("./data/infos")

25.saveAsObjectFile運算元

將數據集中元素以ObjectFile形式寫入本地文件系統或者HDFS中

infos.saveAsObjectFile("./data/infosObject")

26.collect運算元

collect回收運算元,會將結果回收到Driver端,如果結果比較大,就不要回收,這樣的話會造成Driver端的OOM

val lines: RDD[String] = sc.textFile("./data/words")
sc.setLogLevel("Error")
val result: Array[String] = lines.collect()
result.foreach(println)

27.collectAsMap運算元

將K、V格式的RDD回收到Driver端作為Map使用

val weightInfos: RDD[(String, Double)] = sc.parallelize(
List[(String,Double)](new Tuple2("zhangsan",99),
new Tuple2("lisi",78.6),
new Tuple2("wangwu",122.2323)
)
)
val stringToDouble: collection.Map[String, Double] = weightInfos.collectAsMap()
stringToDouble.foreach(tp=>{
println(tp._1+"**************"+tp._2)
})
sc.stop()

28.count運算元

count統計RDD共有多少行數據

val lines: RDD[String] = sc.textFile("./data/sampleData.txt")

val result: Long = lines.count()

println(result)

sc.stop()

直接給出結果行數

29.countByKey運算元、countByValue運算元

countByKey統計相同的key出現的個數

val rdd: RDD[(String, Integer)] = sc.makeRDD(List[(String,Integer)](
("a",1),("a",100),("a",1000),("b",2),("b",200),("c",3),("c",4),("d",122)
))
val result: collection.Map[String, Long] = rdd.countByKey()
result.foreach(println)

countByValue統計RDD中相同的Value出現的次數,不要求數據必須為RDD格式

val rdd = sc.makeRDD(List[(String,Integer)](
("a",1),("a",1),("a",1000),("b",2),("b",200),("c",3),("c",3)
))
val result: collection.Map[(String, Integer), Long] = rdd.countByValue()
result.foreach(println)

30、take、takeSample運算元

take取出RDD中的前N個元素

val lines: RDD[String] = sc.textFile("./data/words")

val array: Array[String] = lines.take(3)

array.foreach(println)

takeSapmle(withReplacement,num,seed),隨機抽樣將數據結果拿回Driver端使用,返回Array,

withReplacement:有無放回抽樣,num:抽樣的條數,seed:種子

val lines: RDD[String] = sc.textFile("./data/words")

val result: Array[String] = lines.takeSample(false,3,10)

result.foreach(println)

31、reduce運算元

val rdd: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5))

val result: Int = rdd.reduce((v1, v2) => { v1 + v2 })

//直接得到結果

println(result) }

32.Aggregate運算元----transformation類運算元

首先是給定RDD的每一個分區一個初始值,然後RDD中每一個分區中按照相同的key,結合初始值去合併,最後RDD之間相同的key聚合

val rdd1: RDD[(String, Int)] = sc.makeRDD(List[(String, Int)](
("zhangsan", 10), ("zhangsan", 20), ("wangwu", 30),
("lisi", 40), ("zhangsan", 50), ("lisi", 60),
("wangwu", 70), ("wangwu", 80), ("lisi", 90)
), 3)
rdd1.mapPartitionsWithIndex((index,iter)=>{
val arr: ArrayBuffer[(String, Int)] = ArrayBuffer[(String,Int)]()
iter.foreach(tp=>{
arr.append(tp)
println("rdd1 partition index ="+index+", value ="+tp)
})
arr.iterator
}).count()
val result: RDD[(String, String)] =
rdd1.aggregateByKey("hello")(
(s, v)=>{s+"~"+v}, (s1, s2)=>{s1+"#"+s2}
)
result.foreach(println)

mapPartitionsWithIndex注釋掉執行結果:

33.zip運算元 ---Transformation類運算元

將兩個RDD合成一個K,V格式的RDD,分區數要相同,每個分區中的元素必須相同

val rdd1: RDD[String] = sc.parallelize(List[String]("a","b","c","d"),2)
val rdd2: RDD[Int] = sc.parallelize(List[Int](1,2,3,4),2)
val result: RDD[(String, Int)] = rdd1.zip(rdd2)
result.foreach(println)

33、zipWithIndex運算元---Transformation類運算元

val rdd1 = sc.parallelize(List[String]("a","b","c"),2)
val rdd2 = sc.parallelize(List[Int](1,2,3),numSlices = 2)
val result: RDD[(String, Long)] = rdd1.zipWithIndex()
val result2: RDD[(Int, Long)] = rdd2.zipWithIndex()
result.foreach(println)
result2.foreach(println)


推薦閱讀:
相關文章