(六)Spark Streaming 運算元梳理 — glom運算元
目錄
天小天:(一)Spark Streaming 運算元梳理 — 簡單介紹streaming運行邏輯天小天:(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions天小天:(三)Spark Streaming 運算元梳理 — transform運算元
天小天:(四)Spark Streaming 運算元梳理 — Kafka createDirectStream天小天:(五)Spark Streaming 運算元梳理 — foreachRDD天小天:(六)Spark Streaming 運算元梳理 — glom運算元天小天:(七)Spark Streaming 運算元梳理 — repartition運算元
前言
本章主要講解glom
運算元的實現原理。
看例子
首先還是看一個簡單地例子。
package streaming
import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* @date 2019/01/21
*/
object Api {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
val rddQueue = new mutable.Queue[RDD[Int]]()
val ssc = new StreamingContext(sparkConf, Seconds(2))
// consume from rddQueue
val lines = ssc.queueStream(rddQueue)
// glom
lines.glom().map(x => for(i <- x) println("glom ==> " + i)).print()
ssc.start()
// produce to rddQueue
for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
}
}
這段代碼map
執行的結果為:
glom ==> 1
glom ==> 2
glom ==> 3
glom ==> 4
glom ==> 5
glom ==> 6
glom ==> 7
glom ==> 8
glom ==> 9
glom ==> 10
glom ==> 11
glom ==> 12
glom ==> 13
glom ==> 14
glom ==> 15
glom ==> 16
....
glom
的作用是:把某個批次的一個分區的元素從迭代器類型轉換為Array
類型。
源碼分析
DStream分析
先看下DStream中的源碼
/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
def glom(): DStream[Array[T]] = ssc.withScope {
new GlommedDStream(this)
}
很簡單,就是實例化了GlommedDStream
類。
GlommedDStream分析
package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
}
}
我們只看compute
方法的實現,就是執行RDD
的glom
方法。
RDD
分析
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
上面是RDD
中glom
方法實現。主要的是實例化MapPartitionsRDD
。MapPartitionsRDD
類的在(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions已經介紹過來,這裡就不在說了。
我們主要看下方法的第二個入參:(context, pid, iter) => Iterator(iter.toArray)
。這段代碼的作用是把每個partition的元素從Iterator
類型轉換為Array
類型之後再包裝到迭代器中返回。這段代碼也是glom
方法最核心的代碼。
注意
其實到上面已經講完了glom
的方法,其中有一點需要注意的是,由於glom
會把每個批次每個分區的數據從Iterator
類型轉換為Array
類型,所以如果每個分區的數據非常大的話會出現OOM的情況。
源碼地址
源碼地址
推薦閱讀: