目錄

天小天:(一)Spark Streaming 運算元梳理 — 簡單介紹streaming運行邏輯天小天:(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions

天小天:(三)Spark Streaming 運算元梳理 — transform運算元

天小天:(四)Spark Streaming 運算元梳理 — Kafka createDirectStream天小天:(五)Spark Streaming 運算元梳理 — foreachRDD天小天:(六)Spark Streaming 運算元梳理 — glom運算元

天小天:(七)Spark Streaming 運算元梳理 — repartition運算元

前言

在上一章我們了解了Streaming的運行邏輯和幾個運算元。本章主要介紹下mapPartitions和flatMap的實現邏輯。

看代碼

首先看一下flatMap和mapPartitions代碼示例:

package streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**
* @date 2019/01/21
*/
object Api {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
val rddQueue = new mutable.Queue[RDD[Int]]()

val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.queueStream(rddQueue)

// flatMap & mapPartitions
val d = lines.flatMap(List(_, 20, 30, 40)).mapPartitions(iteratorAdd)

d.print()

ssc.start()
for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
}

def iteratorAdd(input: Iterator[Int]) : Iterator[String] = {
val output = ListBuffer[String]()
for (t <- input){
output += t.toString + " map"
}
output.iterator
}

}

上面的代碼flatMap的功能是對原始數據每個元素後面都增加20,30,40這三個元素;mapPartitions實現的功能是在每個元素後面都增加」 map「這個字元串。整體的輸出結果就是:

1 map
20 map
30 map
40 map
2 map
20 map
30 map
40 map
3 map
20 map
...

那麼接下來我們就詳細講解flatMap和mapPartitions是如何實現的。

flatMap

flatMap的功能是:將輸入的元素通過flatMap內的function轉換成Iterator(迭代器)後再輸出Iterator中每個元素。通俗說就是把元素利用function計算成一個Array後再把這個Array拍扁,變成一個元素一個元素的樣式輸出。 以上文代碼為例:

那麼接下來我們看一下源碼flatMap是如何實現的。

/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}

上面代碼是入口,flatMapFunc為我們編寫的方法List(_,20,30,40)。這裡是實例化了FlatMappedDStream。接下來我們看下FlatMappedDStream的實現。

package org.apache.spark.streaming.dstream

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {

override def dependencies: List[DStream[_]] = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
// 實際執行的代碼
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}

我們主要看compute方法,這個是spark Streaming執行批次任務時執行的方法。 parent.getOrCompute(validTime)這段代碼我們在上一章已將分析過了,這裡返回的是Option[RDD]_的類型是RDD。到這裡我們可以知道,Streaming的flatMap執行的就是RDD的flatMap運算元。 接下來我們繼續看下RDD的flatMap是如何實現的,繼續看RDD的flatMap實現。

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

我們在上一章看過了map運算元的實現其實現的也是MapPartitionsRDD所以我們就不做深入理解MapPartitionsRDD是如何實現的。我們看下其與map運算元的區別,區別就在於實例化的第二個參數為(context, pid, iter) => iter.flatMap(cleanF)其中iter調用的是scala的flatMap方法。 那麼我們繼續看下scala的flatMap是如何實現的。

/** Creates a new iterator by applying a function to all values produced by this iterator
* and concatenating the results.
*
* @param f the function to apply on each element.
* @return the iterator resulting from applying the given iterator-valued function
* `f` to each value produced by this iterator and concatenating the results.
* @note Reuse: $consumesAndProducesIterator
*/
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
// 每一個元素生成的迭代器,本例中可能是 (1,20,30,40)
private var cur: Iterator[B] = empty
// 獲取下一個cur。其中self為flatMap上游的Iterator,所以這裡的self.next調用的是上游Iterator的next方法。
private def nextCur() { cur = f(self.next()).toIterator }
// 判斷是否還有下一個元素
def hasNext: Boolean = {
// Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
// but slightly shorter bytecode (better JVM inlining!)
// 判斷cur是否有下一個元素,如果有則返回true
// 如果沒有則進入循環,並判斷self是否有下一個元素。如果沒有則則返回false,如果有在更新cur。在下一次循環判斷cur是否有下一個元素時會返回true退出循環,並返回true。
while (!cur.hasNext) {
if (!self.hasNext) return false
nextCur()
}
true
}
// 首先電泳hasNext方法判斷是否還有下一個元素,如果有則放回cur並且執行next
def next(): B = (if (hasNext) cur else empty).next()
}

回顧下我們Streaming的flatMap運算元的入參List(_,20,30,40)也就是scala的flatMap入參。那麼我們看下當調用next方法時返回的是什麼,詳細的看上面的代碼注釋進行理解。 到這裡flatMap就講解完了。其實我們可以理解為調用的解釋scala的flatMap方法。

mapPartitions

mapPartition實現的功能和實現方式和map很相像,所以這裡會結合map的代碼比對著看。 首先看下mapPartitions的代碼:

* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = ssc.withScope {
new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
}

與其對比的map邏輯如下:

/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}

恩,這裡看前者(mapPartitions)實例化的是MapPartitionedDStream對象,後者實例化的是MappedDStream對象。 我們繼續往下看,同樣先是mapPartitions

package org.apache.spark.streaming.dstream

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}

private[streaming]
class MapPartitionedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {

override def dependencies: List[DStream[_]] = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
}

接下來是map

package org.apache.spark.streaming.dstream

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}

private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {

override def dependencies: List[DStream[_]] = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}

可以看到,兩者實現的區別主要在comput調用RDD運算元不同,前者是mapPartitions,後者是map。至於mapPartitions的第二個入參我們暫時不做深入研究,其實就是是否保持上游RDD的分區信息。

那麼我們就繼續看RDD的mapPartitionsmap有什麼區別。

首先是mapPartitions

/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesnt modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

接下來是map

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

到這裡我們就可以看到兩者的本質區別了,兩者到實例化MapPartitionsRDD。但是在第二個入參有所不同,前者是(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),後者是(context, pid, iter) => iter.map(cleanF)

仔細看cleanF出現的位置,前者是cleanedF(iter),後者是iter.map(cleanF)iter可以理解為Streaming的一個批次的所有元素。前者是用cleanF計算一個批次的所有元素,後者是先遍歷處一個批次的一個元素再用cleanF計算這一個元素。

到這裡我們就探明了mapPartitions的實現原理了,那麼什麼時候用mapPartitions那?從代碼上進行反推可能有兩種情況。

  1. 可以一次性操作一個批次所有iter的情況可以用。
  2. 對需要在遍歷一個批次數據前需要針對每個partition做一些初始化的情況,例如簡歷DB連接之類的操作。

到這裡針對mapPartitions的講解也就結束了。

總結

以上就是flatMap和mapPartitions兩個運算元的講解,如果還有什麼疑問可以提出,等我了解後再回答。

附上源代碼鏈接:github.com/youtNa/all-p


推薦閱讀:
相关文章