目錄天小天:(一)Spark Streaming 運算元梳理 — 簡單介紹streaming運行邏輯天小天:(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions 天小天:(三)Spark Streaming 運算元梳理 — transform運算元天小天:(四)Spark Streaming 運算元梳理 — Kafka createDirectStream天小天:(五)Spark Streaming 運算元梳理 — foreachRDD天小天:(六)Spark Streaming 運算元梳理 — glom運算元 天小天:(七)Spark Streaming 運算元梳理 — repartition運算元
目錄
天小天:(三)Spark Streaming 運算元梳理 — transform運算元
天小天:(七)Spark Streaming 運算元梳理 — repartition運算元
在上一章我們了解了Streaming的運行邏輯和幾個運算元。本章主要介紹下mapPartitions和flatMap的實現邏輯。
首先看一下flatMap和mapPartitions代碼示例:
package streaming
import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable import scala.collection.mutable.ListBuffer
/** * @date 2019/01/21 */ object Api { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]") val rddQueue = new mutable.Queue[RDD[Int]]()
val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.queueStream(rddQueue)
// flatMap & mapPartitions val d = lines.flatMap(List(_, 20, 30, 40)).mapPartitions(iteratorAdd)
d.print()
ssc.start() for (i <- 1 to 30) { rddQueue.synchronized { rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) } Thread.sleep(1000) } ssc.stop() }
def iteratorAdd(input: Iterator[Int]) : Iterator[String] = { val output = ListBuffer[String]() for (t <- input){ output += t.toString + " map" } output.iterator }
}
上面的代碼flatMap的功能是對原始數據每個元素後面都增加20,30,40這三個元素;mapPartitions實現的功能是在每個元素後面都增加」 map「這個字元串。整體的輸出結果就是:
1 map 20 map 30 map 40 map 2 map 20 map 30 map 40 map 3 map 20 map ...
那麼接下來我們就詳細講解flatMap和mapPartitions是如何實現的。
flatMap的功能是:將輸入的元素通過flatMap內的function轉換成Iterator(迭代器)後再輸出Iterator中每個元素。通俗說就是把元素利用function計算成一個Array後再把這個Array拍扁,變成一個元素一個元素的樣式輸出。 以上文代碼為例:
那麼接下來我們看一下源碼flatMap是如何實現的。
/** * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
上面代碼是入口,flatMapFunc為我們編寫的方法List(_,20,30,40)。這裡是實例化了FlatMappedDStream。接下來我們看下FlatMappedDStream的實現。
flatMapFunc
List(_,20,30,40)
FlatMappedDStream
package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, Time}
private[streaming] class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => TraversableOnce[U] ) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = { // 實際執行的代碼 parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } }
我們主要看compute方法,這個是spark Streaming執行批次任務時執行的方法。 parent.getOrCompute(validTime)這段代碼我們在上一章已將分析過了,這裡返回的是Option[RDD],_的類型是RDD。到這裡我們可以知道,Streaming的flatMap執行的就是RDD的flatMap運算元。 接下來我們繼續看下RDD的flatMap是如何實現的,繼續看RDD的flatMap實現。
compute
parent.getOrCompute(validTime)
Option[RDD]
_
RDD
flatMap
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
我們在上一章看過了map運算元的實現其實現的也是MapPartitionsRDD所以我們就不做深入理解MapPartitionsRDD是如何實現的。我們看下其與map運算元的區別,區別就在於實例化的第二個參數為(context, pid, iter) => iter.flatMap(cleanF)其中iter調用的是scala的flatMap方法。 那麼我們繼續看下scala的flatMap是如何實現的。
map
MapPartitionsRDD
(context, pid, iter) => iter.flatMap(cleanF)
iter
/** Creates a new iterator by applying a function to all values produced by this iterator * and concatenating the results. * * @param f the function to apply on each element. * @return the iterator resulting from applying the given iterator-valued function * `f` to each value produced by this iterator and concatenating the results. * @note Reuse: $consumesAndProducesIterator */ def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] { // 每一個元素生成的迭代器,本例中可能是 (1,20,30,40) private var cur: Iterator[B] = empty // 獲取下一個cur。其中self為flatMap上游的Iterator,所以這裡的self.next調用的是上游Iterator的next方法。 private def nextCur() { cur = f(self.next()).toIterator } // 判斷是否還有下一個元素 def hasNext: Boolean = { // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext } // but slightly shorter bytecode (better JVM inlining!) // 判斷cur是否有下一個元素,如果有則返回true // 如果沒有則進入循環,並判斷self是否有下一個元素。如果沒有則則返回false,如果有在更新cur。在下一次循環判斷cur是否有下一個元素時會返回true退出循環,並返回true。 while (!cur.hasNext) { if (!self.hasNext) return false nextCur() } true } // 首先電泳hasNext方法判斷是否還有下一個元素,如果有則放回cur並且執行next def next(): B = (if (hasNext) cur else empty).next() }
回顧下我們Streaming的flatMap運算元的入參List(_,20,30,40)也就是scala的flatMap入參。那麼我們看下當調用next方法時返回的是什麼,詳細的看上面的代碼注釋進行理解。 到這裡flatMap就講解完了。其實我們可以理解為調用的解釋scala的flatMap方法。
next
mapPartition實現的功能和實現方式和map很相像,所以這裡會結合map的代碼比對著看。 首先看下mapPartitions的代碼:
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ def mapPartitions[U: ClassTag]( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false ): DStream[U] = ssc.withScope { new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) }
與其對比的map邏輯如下:
/** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope { new MappedDStream(this, context.sparkContext.clean(mapFunc)) }
恩,這裡看前者(mapPartitions)實例化的是MapPartitionedDStream對象,後者實例化的是MappedDStream對象。 我們繼續往下看,同樣先是mapPartitions:
mapPartitions
MapPartitionedDStream
MappedDStream
private[streaming] class MapPartitionedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean ) extends DStream[U](parent.ssc) {
override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) } }
接下來是map
private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) {
override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
可以看到,兩者實現的區別主要在comput調用RDD運算元不同,前者是mapPartitions,後者是map。至於mapPartitions的第二個入參我們暫時不做深入研究,其實就是是否保持上游RDD的分區信息。
那麼我們就繼續看RDD的mapPartitions與map有什麼區別。
首先是mapPartitions:
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesnt modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
接下來是map:
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
到這裡我們就可以看到兩者的本質區別了,兩者到實例化MapPartitionsRDD。但是在第二個入參有所不同,前者是(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),後者是(context, pid, iter) => iter.map(cleanF)。
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter)
(context, pid, iter) => iter.map(cleanF)
仔細看cleanF出現的位置,前者是cleanedF(iter),後者是iter.map(cleanF)。iter可以理解為Streaming的一個批次的所有元素。前者是用cleanF計算一個批次的所有元素,後者是先遍歷處一個批次的一個元素再用cleanF計算這一個元素。
cleanF
cleanedF(iter)
iter.map(cleanF)
到這裡我們就探明了mapPartitions的實現原理了,那麼什麼時候用mapPartitions那?從代碼上進行反推可能有兩種情況。
到這裡針對mapPartitions的講解也就結束了。
以上就是flatMap和mapPartitions兩個運算元的講解,如果還有什麼疑問可以提出,等我了解後再回答。
附上源代碼鏈接:https://github.com/youtNa/all-practice/blob/master/spark-test/src/main/scala/streaming/Api.scala