?一,編程環境

以下為Mac系統上單機版Spark練習編程環境的配置方法。

注意:僅配置練習環境無需安裝Hadoop,無需安裝Scala。

1,安裝Java8

注意避免安裝其它版本的jdk,否則會有不兼容問題。

oracle.com/technetwork/

2,下載spark並解壓spark.apache.org/downlo解壓到以下路徑:Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.73,配置spark環境vim ~/.bashrc插入下面兩條語句

export SPARK_HOME=/Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

4,配置jupyter支持

若未有安裝jupyter可以下載Anaconda安裝之。使用toree可以安裝jupyter環境下的Apache Toree-Scala內核,以便在jupyter環境下運行Spark。

pip install toree
jupyter toree install --spark_home=Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7

二,運行Spark

Spark可以通過以下一些方式運行。

1,通過spark-shell進入Spark互動式環境,使用Scala語言。2,通過spark-submit提交Spark應用程序進行批處理。這種方式可以提交Scala或Java語言編寫的代碼編譯後生成的jar包,也可以直接提交Python腳本。3,通過pyspark進入pyspark互動式環境,使用Python語言。這種方式可以指定jupyter或者ipython為交互環境。4,通過zepplin notebook互動式執行。

zepplin是jupyter notebook的apache對應產品。

5,安裝Apache Toree-Scala內核。可以在jupyter 中運行spark-shell。

使用spark-shell運行時,還可以添加兩個常用的兩個參數。

一個是master指定使用何種分布類型。第二個是jars指定依賴的jar包。

#local本地模式運行,默認使用4個邏輯CPU內核
spark-shell

#local本地模式運行,使用全部內核,添加 code.jar到classpath
spark-shell --master local[*] --jars code.jar

#local本地模式運行,使用4個內核
spark-shell --master local[4]

#standalone模式連接集群,指定url和埠號
spark-shell --master spark://master:7077

#客戶端模式連接YARN集群,Driver運行在本地,方便查看日誌,調試時推薦使用。
spark-shell --master yarn-client

#集群模式連接YARN集群,Driver運行在集群,本地機器計算和通信壓力小,批量任務時推薦使用。
spark-shell --master yarn-cluster

#提交scala寫的任務
./bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode cluster
--driver-memory 4g
--executor-memory 2g
--executor-cores 1
--queue thequeue
examples/jars/spark-examples*.jar 10

#提交python寫的任務
spark-submit --master yarn
--executor-memory 6G
--driver-memory 6G
--deploy-mode cluster
--num-executors 600
--conf spark.yarn.maxAppAttempts=1
--executor-cores 1
--conf spark.default.parallelism=2000
--conf spark.task.maxFailures=10
--conf spark.stage.maxConsecutiveAttempts=10
test.py

三,創建RDD

創建RDD的基本方式有兩種,第一種是使用textFile載入本地或者集群文件系統中的數據。第二種是使用parallelize方法將Driver中的數據結構並行化成RDD。

1,textFile

2,parallelize(或makeRDD)

四,常用Action操作

Action操作將觸發基於RDD依賴關係的計算。

1,collect

2,take

3,takeSample

4,first

5,count

6,reduce

7,foreach

8,coutByKey

9,saveAsFile

五,常用Transformation操作

Transformation轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關係,只有當Action操作觸發到該依賴的時候,它才被計算。

1,map

2,filter

3,flatMap

4,sample

5,distinct

6,subtract

7,union

8,intersection

9,cartesian

10,sortBy

11,pipe

六,常用PairRDD轉換操作

PairRDD指的是數據為Tuple2數據類型的RDD,其每個數據的第一個元素被當做key,第二個元素被當做value。

1,reduceByKey

2,groupByKey

3,sortByKey

4,join

5,leftOuterJoin

6,rightOuterJoin

7,cogroup

8,subtractByKey

9,foldByKey

七,持久化操作

如果一個RDD被多個任務用作中間量,那麼對其進行cache,緩存到內存中會對加快計算非常有幫助。

聲明對一個RDD進行cache後,該RDD不會被立即緩存,而是等到它第一次因為某個Action操作觸發後被計算出來時才進行緩存。可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和MEMORY_AND_DISK。

1,cache

2,persist

八,共享變數

當Spark集群在許多節點上運行一個函數時,默認情況下會把這個函數涉及到的對象在每個節點生成一個副本。但是,有時候需要在不同節點或者節點和Driver之間共享變數。

Spark提供兩種類型的共享變數,廣播變數和累加器。

廣播變數是不可變變數,實現在不同節點不同任務之間共享數據。廣播變數在每個節點上緩存一個只讀的變數,而不是為每個task生成一個副本,可以減少數據的傳輸。

累加器主要用於不同節點和Driver之間共享變數,只能實現計數或者累加功能。累加器的值只有在Driver上是可讀的,在節點上只能執行add操作。

1,broadcast

2,Accumulator

九,分區操作

分區操作包括改變分區方式,以及和分區相關的一些轉換操作。

1,coalesce

2,repartition

3,partitionBy

4,mapPartitions

5,mapPartitionsWithIndex

6,foreachPartitions

7,aggregate

8,aggregateByKey

推薦閱讀:

30分鐘理解Spark的基本原理?

mp.weixin.qq.com

3小時Scala入門?

mp.weixin.qq.com

公眾號後台回復關鍵字:"RDD"獲取本文全部代碼。


推薦閱讀:
相关文章