本系列內容適用範圍:
* 2018.11.02 update, Spark 2.4 全系列 √ (已發布:2.4.0) * 2018.02.28 update, Spark 2.3 全系列 √ (已發布:2.3.0 ~ 2.3.2) * 2017.07.11 update, Spark 2.2 全系列 √ (已發布:2.2.0 ~ 2.2.3)
閱讀本文前,請一定先閱讀 [Spark Streaming 實現思路與模塊概述](0.1 Spark Streaming 實現思路與模塊概述.md) 一文,其中概述了 Spark Streaming 的 4 大模塊的基本作用,有了全局概念後再看本文對 模塊 1 DAG 靜態定義 細節的解釋。
模塊 1 DAG 靜態定義
我們在前面的文章講過,Spark Streaming 的 模塊 1 DAG 靜態定義 要解決的問題就是如何把計算邏輯描述為一個 RDD DAG 的「模板」,在後面 Job 動態生成的時候,針對每個 batch,都將根據這個「模板」生成一個 RDD DAG 的實例。
在 Spark Streaming 裏,這個 RDD 「模板」對應的具體的類是 DStream,RDD DAG 「模板」對應的具體類是 DStreamGraph。
DStream
DStreamGraph
DStream 的全限定名是:org.apache.spark.streaming.dstream.DStream DStreamGraph 的全限定名是:org.apache.spark.streaming.DStreamGraph
本文涉及的類在 Spark Streaming 中的位置如上圖所示;下面詳解 DStream, DStreamGraph。
回想一下,RDD 的定義是一個只讀、分區的數據集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我們把 Dstream 也視同數據集。
an RDD is a read-only, partitioned collection of records
我們先看看定義在這個 DStream 數據集上的轉換(transformation)和 輸出(output)。
現在假設我們有一個 DStream 數據集 a:
val a = new DStream()
那麼通過 filter() 操作就可以從 a 生成一個新的 DStream 數據集 b:
filter()
a
b
val b = a.filter(func)
這裡能夠由已有的 DStream 產生新 DStream 的操作統稱 transformation。一些典型的 tansformation 包括 map(), filter(), reduce(), join() 等 。
map()
reduce()
join()
Transformation Meaning map(func) Return a new DStream by passing each element of the source DStream through a function func. flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions.
另一些不產生新 DStream 數據集,而是隻在已有 DStream 數據集上進行的操作和輸出,統稱為 output。比如 a.print()就不會產生新的數據集,而是隻是將 a 的內容列印出來,所以 print() 就是一種 output 操作。一些典型的 output 包括 print(), saveAsTextFiles(), saveAsHadoopFiles(), foreachRDD() 等。
a.print()
print()
saveAsTextFiles()
saveAsHadoopFiles()
foreachRDD()
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. Python API This is called pprint() in the Python API. saveAsTextFiles(prefix, [suffix]) Save this DStreams contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". saveAsObjectFiles(prefix, [suffix]) Save this DStreams contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. saveAsHadoopFiles(prefix, [suffix]) Save this DStreams contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
我們看一下 Spark Streaming 官方的 quick example 的這段對 DStream DAG 的定義,注意看代碼中的注釋講解內容:
// ssc.socketTextStream() 將創建一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 將監聽本機 9999 埠 val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")) // DStream transformation val pairs = words.map(word => (word, 1)) // DStream transformation val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation wordCounts.print() // DStream output
這裡我們找到 ssc.socketTextStream("localhost", 9999) 的源碼實現:
ssc.socketTextStream("localhost", 9999)
def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel) : ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
也就是 ssc.socketTextStream() 將 new 出來一個 DStream 具體子類 SocketInputDStream 的實例。
ssc.socketTextStream()
new
SocketInputDStream
然後我們繼續找到下一行 lines.flatMap(_.split(" ")) 的源碼實現:
lines.flatMap(_.split(" "))
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
也就是 lines.flatMap(_.split(" ")) 將 new 出來一個 DStream 具體子類 FlatMappedDStream 的實例。
FlatMappedDStream
後面幾行也是如此,所以我們如果用 DStream DAG 圖來表示之前那段 quick example 的話,就是這個樣子:
也即,我們給出的那段代碼,用具體的實現來替換的話,結果如下:
val lines = new SocketInputDStream("localhost", 9999) // 類型是 SocketInputDStream
val words = new FlatMappedDStream(lines, _.split(" ")) // 類型是 FlatMappedDStream val pairs = new MappedDStream(words, word => (word, 1)) // 類型是 MappedDStream val wordCounts = new ShuffledDStream(pairs, _ + _) // 類型是 ShuffledDStream new ForeachDStream(wordCounts, cnt => cnt.print()) // 類型是 ForeachDStream
總結一下:
.flatMap()
FaltMappedDStream
.map()
MappedDStream
ForEachDStream
func
cnt => cnt.print()
我們將有另一篇文章具體對 DStream 所有 transformation 的列舉和分析,本文不展開。
上面我們看到的 SocketInputDStream, FlatMappedDStream, ForeachDStream 等都是 DStream 的具體子類。
ForeachDStream
DStream 的所有子類如下:
一會我們要對其這些 DStream 子類進行一個分類。
先再次回過頭來看一下 transformation 操作。當我們寫代碼 c = a.join(b), d = c.filter() 時, 它們的 DAG 邏輯關係是 a/b → c,c → d,但在 Spark Streaming 在進行物理記錄時卻是反向的 a/b ← c, c ← d,如下圖:
c = a.join(b), d = c.filter()
a/b → c,c → d
a/b ← c, c ← d
那物理上為什麼不順著 DAG 來正向記錄,卻用反向記錄?
這裡背後的原因是,在 Spark Core 的 RDD API 裏,RDD 的計算是被觸發了以後才進行 lazy 求值的,即當真正求 d 的值的時候,先計算上游 dependency c;而計算 c 則先進一步計算 c 的上游 dependency a 和 b。Spark Streaming 裏則與 RDD DAG 的反向表示保持了一致,對 DStream 也採用的反向表示。
d
c
所以,這裡 d 對 c 的引用,表達的是一個上游依賴(dependency)的關係;也就是說,不求值則已,一旦 d.print()這個 output 操作觸發了對 d 的求值,那麼就需要從 d 開始往上游進行追溯計算。
d.print()
具體的過程是,d.print() 將 new 一個 d 的一個下游 ForEachDStream x —— x 中記明瞭需要做的操作 func = print() —— 然後在每個 batch 動態生成 RDD 實例時,以 x 為根節點、進行一次 BFS(寬度優先遍歷),就可以快速得到需要進行實際計算的最小集合。如下圖所示,這個最小集合就是 {a, b, c, d}。
ForEachDStream x
x
func = print()
再看一個例子。如下圖所示,如果對 d, f 分別調用 print() 的 output 操作,那麼將在 d, f 的下游分別產生新的 DStream x, y,分別記錄了具體操作 func = print()。在每個 batch 動態生成 RDD 實例時,就會分別對 x 和 y 進行 BFS 遍歷,分別得到上游集合 {a,b,c,d} 和 {b,e,f}。作為對比,這裡我們不對 h 進行 print() 的 output 操作,所以 g, h 將得不到遍歷。
f
DStream x, y
y
e
h
g
通過以上分析,我們總結一下:
我們將在 (2) 中,由 output 操作新生成的 DStream 稱為 output stream。
最後,我們給出:
我們本節所描述的內容,用下圖就能夠總結了:
福利部分:
《幾百TJava和大數據資源下載》
資源下載?shimo.im 推薦閱讀: