1.Spark Shuffle調優

shuffle在spark的運算元中產生,也就是運行task的時候才會產生shuffle.

2.sortShuffleManager

spark shuffle的默認計算引擎叫sortshuffleManager,它負責shuffle過程的執行、計算和組件的處理,sortshuffleManager會將task進行shuffle操作時產生的臨時磁碟文件合併成一個磁碟文件,在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁碟文件中的部分數據即可。

3.Shuffle當中可能遇到的問題

1. 數據量非常大,從其他各臺機器收集數據佔用大量網路。

2. 數據如何分類,即如何Partition,Hash、Sort等;

3. 負載均衡(數據傾斜),因為採用不同的Shuffle方式對數據不同的分類,而分類之後又要跑到具體的節點上計算,如果不恰當的話,很容易產生數據傾斜;

4. 網路傳輸效率,需要在壓縮和解壓縮之間做出權衡,序列化和反序列也是要考慮的問題;

說明:具體的Task進行計算的時候盡一切最大可能使得數據具備Process Locality的特性;退而求次是增加數據分片,減少每個Task處理的數據量。

4.參數

1. spark.shuffle.file.buffer(默認值為32K)

該參數是緩衝區的緩衝內存,如果可用的內存資源較為充足的話,可以將緩衝區的值設置大點,這樣會較少磁碟IO次數.,如果合理調節該參數,性能會提升1%~5%... 可以設置為64K。

2. spark.reducer.max.SizeFlight(默認為48M)

該參數是stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網路都拉取到自己所在的節點上,然後進行key的聚合或連接等操作,如果合理調節該參數(增大),性能會提升1%~5%...

3. Saprk.shuffle.memoryFraction(默認20%)shuffle聚合內存的比例

該參數是數據根據不同的shuffle運算元將數據寫入內存結構中,內存結構達到閾值會溢出臨時文件,這個參數就是則是內存結構的閾值百分比的,不是內存結構的內存大小. 如果內存充足,而且很少使用持久化操作,建議調高這個比例,可以減少頻繁對磁碟進行IO操作,合理調節該參數可以將性能提升10%左右。

4. spark.shuffle.io.maxRetries拉取數據重試次數(默認3次)

該參數是stage的task向上一個stage的task計算結果拉取數據,也就是上面那個操作,有時候會因為網路異常原因,導致拉取失敗,失敗時候默認重新拉取三次,三次過還是失敗的話作業就執行失敗了,根據具體的業務可以考慮將默認值增大,這樣可以避免由於JVM的一些原因或者網路不穩定等因素導致的數據拉取失敗.也有助於提高spark作業的穩定性. 可以適當的提升重新拉取的次數,最大為60次.

5.spark.shuffle.io.retryWait(默認是5s)----- 重試間隔時間60s

是每次拉取數據的間隔時間... 建議加大間隔時長(比60s),以增加shuffle操作的穩定性

6. Spark Shuffle的種類

7. HashShuffle 合併機制

合併機制就是復用buffer,開啟合併機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。

8. spark.shuffle.sort.bypassMergeThreshold -- 200 SortShuffle bypass機制 200次

Executor的堆外內存調優

為了優化JVM和的讀寫速度,這裡的內核kernel與JVM共用同一個堆外內存。這樣當JVM中的Executor需要讀取數據時,將請求發送給Kernel,kernel將數據準備好,直接拉取到堆外內存中,那麼JVM中的Executor就可以直接去堆外內存中拉取數據即可,所以可以增加Executor的堆外內存大小,來提升Executor的讀寫效率。

Spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網路傳輸的過程會申請堆外內存(netty是零拷貝),所以使用了堆外內存。默認情況下,這個堆外內存上限默認是每一個executor的內存大小的10%;真正處理大數據的時候,這裡都會出現問題,導致spark作業反覆崩潰,無法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G。

executor在進行shuffle write,優先從自己本地關聯的mapOutPutWorker中獲取某份數據,如果本地block manager沒有的話,那麼會通過TransferService,去遠程連接其他節點上executor的block manager去獲取,嘗試建立遠程的網路連接,並且去拉取數據。頻繁創建對象讓JVM堆內存滿溢,進行垃圾回收。正好碰到那個exeuctor的JVM在垃圾回收。處於垃圾回過程中,所有的工作線程全部停止;相當於只要一旦進行垃圾回收,spark / executor停止工作,無法提供響應,spark默認的網路連接的超時時長是60s;如果卡住60s都無法建立連接的話,那麼這個task就失敗了。task失敗了就會出現shuffle file cannot find的錯誤。

那麼如何調節等待的時長呢?

在./spark-submit提交任務的腳本裡面添加:

--conf spark.core.connection.ack.wait.timeout=300

Executor由於內存不足或者堆外內存不足了,掛掉了,對應的Executor上面的block manager也掛掉了,找不到對應的shuffle map output文件,Reducer端不能夠拉取數據。

我們可以調節堆外內存的大小,如何調節?

在./spark-submit提交任務的腳本裡面添加

yarn下:

--conf spark.yarn.executor.memoryOverhead=2048 單位M

standalone下:

--conf spark.executor.memoryOverhead=2048單位M


推薦閱讀:
相關文章