目錄

天小天:(一)Spark Streaming 運算元梳理 — 簡單介紹streaming運行邏輯天小天:(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions

天小天:(三)Spark Streaming 運算元梳理 — transform運算元

天小天:(四)Spark Streaming 運算元梳理 — Kafka createDirectStream天小天:(五)Spark Streaming 運算元梳理 — foreachRDD天小天:(六)Spark Streaming 運算元梳理 — glom運算元

天小天:(七)Spark Streaming 運算元梳理 — repartition運算元

前言

本章主要講解glom運算元的實現原理。

看例子

首先還是看一個簡單地例子。

package streaming

import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
* @date 2019/01/21
*/
object Api {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
val rddQueue = new mutable.Queue[RDD[Int]]()

val ssc = new StreamingContext(sparkConf, Seconds(2))
// consume from rddQueue
val lines = ssc.queueStream(rddQueue)

// glom
lines.glom().map(x => for(i <- x) println("glom ==> " + i)).print()

ssc.start()

// produce to rddQueue
for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
}
}

這段代碼map執行的結果為:

glom ==> 1
glom ==> 2
glom ==> 3
glom ==> 4
glom ==> 5
glom ==> 6
glom ==> 7
glom ==> 8
glom ==> 9
glom ==> 10
glom ==> 11
glom ==> 12
glom ==> 13
glom ==> 14
glom ==> 15
glom ==> 16
....

glom的作用是:把某個批次的一個分區的元素從迭代器類型轉換為Array類型。

源碼分析

DStream分析

先看下DStream中的源碼

/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
def glom(): DStream[Array[T]] = ssc.withScope {
new GlommedDStream(this)
}

很簡單,就是實例化了GlommedDStream類。

GlommedDStream分析

package org.apache.spark.streaming.dstream

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}

private[streaming]
class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {

override def dependencies: List[DStream[_]] = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
}
}

我們只看compute方法的實現,就是執行RDDglom方法。

RDD分析

/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

上面是RDDglom方法實現。主要的是實例化MapPartitionsRDDMapPartitionsRDD類的在(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions已經介紹過來,這裡就不在說了。

我們主要看下方法的第二個入參:(context, pid, iter) => Iterator(iter.toArray)。這段代碼的作用是把每個partition的元素從Iterator類型轉換為Array類型之後再包裝到迭代器中返回。這段代碼也是glom方法最核心的代碼。

注意

其實到上面已經講完了glom的方法,其中有一點需要注意的是,由於glom會把每個批次每個分區的數據從Iterator類型轉換為Array類型,所以如果每個分區的數據非常大的話會出現OOM的情況。

源碼地址

源碼地址


推薦閱讀:
相关文章