package core.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* cache()和persist()注意問題
* 1.cache()和persist()持久化單位是partition,cache()和persist()是懶執行運算元,需要action運算元觸發執行
* 2.對一個RDD使用cache或者persist之後可以賦值給一個變數,下次直接使用這個變數就是使用持久化的數據。
* 也可以直接對RDD進行cache或者persist,不賦值給一個變數
* 3.如果採用第二種方法賦值給變數的話,後面不能緊跟action運算元
* 4.cache()和persist()的數據在當前application執行完成之後會自動清除
*/
object CacheAndPersist {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("cache")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("Error")
val lines: RDD[String] = sc.textFile("./data/persistData.txt")
// val linescache: RDD[String] = lines.persist(StorageLevel.MEMORY_ONLY)
val linescache: RDD[String] = lines.cache()
val startTime1: Long = System.currentTimeMillis()
val count1: Long = linescache.count()
val endTime1: Long = System.currentTimeMillis()
println("count1 = "+count1+". time = "+(endTime1-startTime1) + "mm")
val starttime2: Long = System.currentTimeMillis()
val count2: Long = linescache.count()
val endTime2: Long = System.currentTimeMillis()
println("count2 = "+count2+", time = "+(endTime2-starttime2) + "ms")
sc.stop()
}
}
14.mapValues運算元
針對K,V格式的數據,只對Value做操作,Key保持不變
val infos: RDD[(String, String)] = sc.makeRDD(
List[(String,String)](
("zhangsan","18"),("lisi","20"),("wangwu","30")
))
val result: RDD[(String, String)] = infos.mapValues(s => {
s + " " + "zhangsan18"
})
result.foreach(println)
sc.stop()
15.flatMapValues運算元
(K,V)->(K,V),作用在K,V格式的RDD上,對一個Key的一個Value返回多個Value
val infos: RDD[(String, String)] = sc.makeRDD(
List[(String,String)](
("zhangsan","18"),("lisi","20"),("wangwu","30")
))
val transInfo: RDD[(String, String)] = infos.mapValues(s => {
s + " " + "zhangsan18"
})
// transInfo.foreach(println)
val result: RDD[(String, String)] = transInfo.flatMapValues(s => {
//按空格切分
s.split(" ")
})
result.foreach(println)
sc.stop()
16.combineByKey運算元
首先給RDD中每個分區中的每一個key一個初始值
其次在RDD每個分區內部相同的key聚合一次
再次在RDD不同的分區之間將相同的key結果聚合一次
val rdd1: RDD[(String, Int)] = sc.makeRDD(List[(String, Int)](
("zhangsan", 10), ("zhangsan", 20), ("wangwu", 30),
("lisi", 40), ("zhangsan", 50), ("lisi", 60),
("wangwu", 70), ("wangwu", 80), ("lisi", 90)
),3)
rdd1.mapPartitionsWithIndex((index,iter)=>{
val arr: ArrayBuffer[(String, Int)] = ArrayBuffer[(String,Int)]()
iter.foreach(tp=>{
arr.append(tp)
println("rdd1 partition index ="+index+".value ="+tp)
})
arr.iterator
}).count()
println("++++++++++++++++++++++++++++++++++++")
val result: RDD[(String, String)] = rdd1.combineByKey(v=>{v+"hello"}, (s:String, v)=>{s+"@"+v}, (s1:String, s2:String)=>{s1+"#"+s2})
result.foreach(println)
17.reduceByKey運算元
首先會根據key去分組,然後在每一組中將value聚合,作用在KV格式的RDD上
首先會根據key去分組,然後在每一組中將value聚合,作用在KV格式的RDD上
val infos: RDD[(String, Int)] = sc.parallelize(
List[(String,Int)](("zhangsan",1),("zhangsan",2),
("zhangsan",3),("lisi",100),("lisi",200)),5
)
val result: RDD[(String, Int)] = infos.reduceByKey((v1, v2)=>{v1+v2})
result.foreach(println)
sc.stop()
18.repartition運算元
重新分區,可以將RDD的分區增多或者減少,會產生shuffle,coalesc(num,true) = repartition(num)
val rdd1: RDD[String] = sc.parallelize(List[String](
"love1", "love2", "love3", "love4",
"love5", "love6", "love7", "love8",
"love9", "love10", "love11", "love12"
), 3)
val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => {
val list: ListBuffer[String] = ListBuffer[String]()
iter.foreach(one => {
list.append(s"rdd1 partition = [$index] ,value = [$one]")
})
list.iterator
}, true)
val rdd3: RDD[String] = rdd2.repartition(3)
val rdd4: RDD[String] = rdd3.mapPartitionsWithIndex((index, iter) => {
val arr: ArrayBuffer[String] = ArrayBuffer[String]()
iter.foreach(one => {
arr.append(s"rdd3 partition = [$index] ,value = [$one]")
})
arr.iterator
})
val results: Array[String] = rdd4.collect()
results.foreach(println)
sc.stop()
19.cogroup運算元
合併兩個RDD,生成一個新的RDD。分區數與分區數多個那個RDD保持一致
val rdd1 = sc.parallelize(List[(String,String)](("zhangsan","female"),("zhangsan","female1"),("lisi","male"),("wangwu","female"),("maliu","male")),3)
val rdd2 = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",19),("lisi",190),("wangwu",20),("tianqi",21)),4)
val resultRDD: RDD[(String, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2)
resultRDD.foreach(info=>{
val key = info._1
val value1: List[String] = info._2._1.toList
val value2: List[Int] = info._2._2.toList
println("key ="+key+",value"+value1+", value2 = "+value2)
})
println("resultRDD partition length ="+resultRDD.getNumPartitions)
sc.stop()
20.join運算元
會產生shuffle,(K,V)格式的RDD和(K,V)格式的RDD按照相同的K,join得到(K,(V,W))格式的數據,分區數按照大的來。
val nameRDD: RDD[(String, String)] = sc.parallelize(List[(String,String)](("zhangsan","female"),("lisi","male"),("wangwu","female")),3)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",19),("wangwu",20)),2)
val joinRDD: RDD[(String, (String, Int))] = nameRDD.join(scoreRDD)
println(joinRDD.getNumPartitions)
joinRDD.foreach(println)
21.leftOutJoin、rightOutJoin運算元、fullOuterJoin運算元
leftOuterJoin(K,V)格式的RDD和(K,V)格式的RDD,使用leftOuterJoin結合,以左邊的RDD出現的key為主 ,得到(K,(V,Option(W)))
val nameRDD: RDD[(String, String)] = sc.parallelize(
List[(String,String)](("zhangsan","female"),
("lisi","male"),("wangwu","female"),("maliu","male")
))
val scoreRDD: RDD[(String, Int)] = sc.parallelize(
List[(String,Int)](("zhangsan",22),("lisi",19),
("wangwu",20),("tianqi",21)
))
val leftOutJoin: RDD[(String, (String, Option[Int]))] = nameRDD.leftOuterJoin(scoreRDD)
leftOutJoin.foreach(println)
sc.stop()
rightOuterJoin(K,V)格式的RDD和(K,W)格式的RDD使用rightOuterJoin結合以右邊的RDD出現的key為主,得到(K,(Option(V),W))
val nameRDD: RDD[(String, String)] = sc.parallelize(
List[(String,String)](("zhangsan","female"),("lisi","male")
,("wangwu","female"),("maliu","male")),3
)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(
List[(String,Int)](("zhangsan",18),("lisi",19),
("wangwu",20),("tianqi",21)),4
)
val rightOuterJoin: RDD[(String, (Option[String], Int))] = nameRDD.rightOuterJoin(scoreRDD)
rightOuterJoin.foreach(println)
println("rightOuterJoin RDD partiotion length = "+rightOuterJoin.getNumPartitions)
sc.stop()
fullOuterJoin運算元(K,,V)格式的RDD和(K,V)格式的RDD,使用fullOuterJoin結合是以兩邊的RDD出現的key為主,得到(K(Option(V),Option(W)))
val nameRDD: RDD[(String, String)] = sc.parallelize(List[(String,String)](("zhangsan","female"),("lisi","male"),("wangwu","female"),("maliu","male")),3)
val ageRDD: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",16),("wangwu",20),("tianqi",21)),4)
val fullOuterJoin: RDD[(String, (Option[String], Option[Int]))] = nameRDD.fullOuterJoin(ageRDD)
fullOuterJoin.foreach(println)
println("fullOuterJoin RDD partition length = "+fullOuterJoin.getNumPartitions)
sc.stop()
22.intersection運算元
intersection取兩個RDD的交集,兩個RDD的類型要一致,結果RDD的分區數要與兩個父RDD多的那個一致
val rdd1: RDD[String] = sc.parallelize(List[String]("zhangsan","lisi","wangwu"),5)
val rdd2: RDD[String] = sc.parallelize(List[String]("zhangsan","lisi","maliu"),4)
val result: RDD[String] = rdd1.intersection(rdd2)
result.foreach(println)
println("intersection partition length = "+ result.getNumPartitions)
sc.stop()
23.foreach運算元
foreach遍歷RDD中的每一個元素
val lines: RDD[String] = sc.textFile("./data/words") lines.foreach(println)
24.saveAsTextFile運算元
將DataSet中的元素以文本的形式寫入本地文件系統或者HDFS中,Spark將會對每個元素調用toString方法,將數據元素轉換成文本文件中的一行數據,若將文件保存在本地文件系統,那麼只會保存在executor所在機器的本地目錄
val infos: RDD[String] = sc.parallelize(List[String]("a","b","c","e","f","g"),4)
infos.saveAsTextFile("./data/infos")
25.saveAsObjectFile運算元
將數據集中元素以ObjectFile形式寫入本地文件系統或者HDFS中
infos.saveAsObjectFile("./data/infosObject")
26.collect運算元
collect回收運算元,會將結果回收到Driver端,如果結果比較大,就不要回收,這樣的話會造成Driver端的OOM
val lines: RDD[String] = sc.textFile("./data/words")
sc.setLogLevel("Error")
val result: Array[String] = lines.collect()
result.foreach(println)
27.collectAsMap運算元
將K、V格式的RDD回收到Driver端作為Map使用
val weightInfos: RDD[(String, Double)] = sc.parallelize(
List[(String,Double)](new Tuple2("zhangsan",99),
new Tuple2("lisi",78.6),
new Tuple2("wangwu",122.2323)
)
)
val stringToDouble: collection.Map[String, Double] = weightInfos.collectAsMap()
stringToDouble.foreach(tp=>{
println(tp._1+"**************"+tp._2)
})
sc.stop()
28.count運算元
count統計RDD共有多少行數據
val lines: RDD[String] = sc.textFile("./data/sampleData.txt")
val result: Long = lines.count()
println(result)
sc.stop()
直接給出結果行數
29.countByKey運算元、countByValue運算元
countByKey統計相同的key出現的個數
val rdd: RDD[(String, Integer)] = sc.makeRDD(List[(String,Integer)](
("a",1),("a",100),("a",1000),("b",2),("b",200),("c",3),("c",4),("d",122)
))
val result: collection.Map[String, Long] = rdd.countByKey()
result.foreach(println)
countByValue統計RDD中相同的Value出現的次數,不要求數據必須為RDD格式
val rdd = sc.makeRDD(List[(String,Integer)](
("a",1),("a",1),("a",1000),("b",2),("b",200),("c",3),("c",3)
))
val result: collection.Map[(String, Integer), Long] = rdd.countByValue()
result.foreach(println)
30、take、takeSample運算元
take取出RDD中的前N個元素
val lines: RDD[String] = sc.textFile("./data/words")
val array: Array[String] = lines.take(3)
array.foreach(println)
takeSapmle(withReplacement,num,seed),隨機抽樣將數據結果拿回Driver端使用,返回Array,
withReplacement:有無放回抽樣,num:抽樣的條數,seed:種子
val lines: RDD[String] = sc.textFile("./data/words")
val result: Array[String] = lines.takeSample(false,3,10)
result.foreach(println)
31、reduce運算元
val rdd: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5))
val result: Int = rdd.reduce((v1, v2) => { v1 + v2 })
//直接得到結果
println(result) }
32.Aggregate運算元----transformation類運算元
首先是給定RDD的每一個分區一個初始值,然後RDD中每一個分區中按照相同的key,結合初始值去合併,最後RDD之間相同的key聚合
val rdd1: RDD[(String, Int)] = sc.makeRDD(List[(String, Int)](
("zhangsan", 10), ("zhangsan", 20), ("wangwu", 30),
("lisi", 40), ("zhangsan", 50), ("lisi", 60),
("wangwu", 70), ("wangwu", 80), ("lisi", 90)
), 3)
rdd1.mapPartitionsWithIndex((index,iter)=>{
val arr: ArrayBuffer[(String, Int)] = ArrayBuffer[(String,Int)]()
iter.foreach(tp=>{
arr.append(tp)
println("rdd1 partition index ="+index+", value ="+tp)
})
arr.iterator
}).count()
val result: RDD[(String, String)] =
rdd1.aggregateByKey("hello")(
(s, v)=>{s+"~"+v}, (s1, s2)=>{s1+"#"+s2}
)
result.foreach(println)
mapPartitionsWithIndex注釋掉執行結果:
33.zip運算元 ---Transformation類運算元
將兩個RDD合成一個K,V格式的RDD,分區數要相同,每個分區中的元素必須相同
val rdd1: RDD[String] = sc.parallelize(List[String]("a","b","c","d"),2)
val rdd2: RDD[Int] = sc.parallelize(List[Int](1,2,3,4),2)
val result: RDD[(String, Int)] = rdd1.zip(rdd2)
result.foreach(println)
33、zipWithIndex運算元---Transformation類運算元
val rdd1 = sc.parallelize(List[String]("a","b","c"),2)
val rdd2 = sc.parallelize(List[Int](1,2,3),numSlices = 2)
val result: RDD[(String, Long)] = rdd1.zipWithIndex()
val result2: RDD[(Int, Long)] = rdd2.zipWithIndex()
result.foreach(println)
result2.foreach(println)
推薦閱讀: