目錄

天小天:(一)Spark Streaming 運算元梳理 — 簡單介紹streaming運行邏輯 天小天:(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions

天小天:(三)Spark Streaming 運算元梳理 — transform運算元

天小天:(四)Spark Streaming 運算元梳理 — Kafka createDirectStream天小天:(五)Spark Streaming 運算元梳理 — foreachRDD天小天:(六)Spark Streaming 運算元梳理 — glom運算元

天小天:(七)Spark Streaming 運算元梳理 — repartition運算元

前言

本文主要講解foreachRDD運算元的實現,關於最佳使用方式在這裡不會講到,如果要了解可以查閱相關資料。

看例子

首先看一個簡單地foreachRDD的例子

package streaming

import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**
* @date 2019/01/21
*/
object Api {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
val rddQueue = new mutable.Queue[RDD[Int]]()

val ssc = new StreamingContext(sparkConf, Seconds(2))
// consume from rddQueue
val lines = ssc.queueStream(rddQueue)
// foreachRDD
lines.foreachRDD(rdd => {
val values = rdd.take(10)
for (value <- values) println("foreachRDD == " + value)
})

ssc.start()

// produce to rddQueue
for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
}

def iteratorAdd(input: Iterator[Int]) : Iterator[String] = {
val output = ListBuffer[String]()
for (t <- input){
output += t.toString + " map"
}
output.iterator
}

}

這個例子中foreachRDD的作用是從每個批次的RDD中取出前10個元素,並列印出來。

從這裡我們可以看出來,foreachRDD的作用是對每個批次的RDD做自定義操作。並且從這個的位置我們也可以看出,這個一個action運算元。

源碼實現

我們知道了foreachRDD的作用,接下來我們詳細看下是如何實現這個運算元的。

調用方法

首先我們看下用戶調用的方法可以有哪些。

/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}

/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams cant be serialized with closures, we cant proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
foreachRDD(foreachFunc, displayInnerRDDOps = true)
}

這兩個foreachRDD就是用戶可以調用的兩個方法。他們兩個的差別起始只有入參foreachFunc的入參是不同的。下面的多了一個Time。這個Time代表批次時間,具體用法在後面會說。

私有的foreachRDD

我們可以看到上面兩個方法都調用了foreachRDD方法,那麼這裡的方法是是什麼那?我們來看下此方法的代碼:

/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
* @param foreachFunc foreachRDD function
* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
* in the `foreachFunc` to be displayed in the UI. If `false`, then
* only the scopes and callsites of `foreachRDD` will override those
* of the RDDs on the display.
*/
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

首先解釋下入參:

  • foreachFunc:用戶定義的方法,可以包含RDDTime兩個參數;
  • displayInnerRDDOps:foreachFunc中的運算元是否顯示在UI中,這個和我們的業務邏輯關係不大,可以先不關注

我們可以看到這裡最終會實例化ForEachDStream對象。

到這裡,DStream的邏輯已經生成。

ForEachDStream實現

看下每個批次中RDD是如何處理的。由於foreachRDD所以我們這裡只關注generateJob方法。

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {// 通過批次時間獲取RDD
case Some(rdd) => // 取到RDD
// 構造本批次job要執行的函數
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time) // 把RDD和批次時間作為入參傳給用戶定義的方法
}
Some(new Job(time, jobFunc)) //返回Job
case None => None // 如果沒有取得,則放回None
}
}

從代碼和注釋已經簡單地解釋了,foreachRDD是如何工作的。其中有兩個地方要說下。

  1. foreachFunc(rdd, time) 這段代碼,用戶定義的方法入參沒有time則用戶不能對批次時間做處理,只有定義的方法同時包含rdd和time兩個參數用戶才能對time做處理。這也是上面調用方法中留下的問題的解釋。
  2. Some(new Job(time, jobFunc))這裡生成的Job最終會提交到Spark集羣上執行。之後的邏輯這裡不做詳解。

總結

至此foreachRDD就講解完了。從上面的代碼來看其實這裡的邏輯還是不難的。

  • 源碼github地址

推薦閱讀:

相關文章