Spark源碼是利用Scala編寫,因此用Scala編寫Spark程序具有天然的優勢,但目前Java仍是主流語言,且Scala和Java程序都是運行在JVM上的。使用JDK8的Lamda expression和Scala的匿名函數很相似,因此案例編寫了Scala和Java版本。

java版本

package com.spark.core;
?
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
?
import java.util.Arrays;
?
public class WordCountLocalJava {
public static void main(String[] args) {
/**
* 編寫spark應用程序
* 第一步,創建sparkconf對象,設置spark應用的配置信息
* 使用setMaster可以設置spark程序要連接的spark集群的master節點的url
* 設置為local,則代表本地運行
*/
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WordCountLocalJava");
?
/**
* 第二步,創建JavaContext對象
* 在spark中,SparkContext是spark所有功能的一個入口,無論是用java,scala還是python
* 都必須要有一個sparkcontext,它的主要作用包括初始化spark應用程序所需要的一些組件,
* 包括調度器(DAGScheduler,TaskScheduler),還會去spark master節點上註冊等
* 在spark中,編寫不同的spark應用程序,使用的sparkcontext是不同的,如果使用的是scala,
* 則是原生的sparkcontext對象,如果使用java,是javaContext對象等
*/
JavaSparkContext jsc = new JavaSparkContext(conf);
?
/**
* 第三步:要針對輸入源(HDFS文件,本地文件等),創建初始的RDD
* 輸入源中的數據會打散,分配到RDD的每個Partition,從而形成一個初始的分散式數據集
* sparkcontext中,用於根據文件類型的輸入源創建RDD的方法,叫做textfile()方法
* 在java中創建的普通RDD,都叫做JavaRDD
*/
JavaRDD<String> lines = jsc.textFile("data/wc.txt");
?
/**
* 第四步:對初始RDD進行transformation操作
* 通常操作會通過創建function,並配合RDD的map、flatmap等運算元來執行
* function如果比較簡單,則創建指定function的匿名內部類
* 如果比較複雜,則會單獨創建一個類,作為實現這個function介面的類
* 先將每一行拆分為單個的單詞
* FlatMapFunction有兩個泛型參數,分別代表了輸入和輸出
* 這裡輸入肯定是String,代表一行一行的文本,輸出也是String,
* FlatMap運算元的作用,是將RDD的每一個元素拆分成一個或者多個元素
*/
?
/**
* 在IDEA中使用JDK8的Lamda expression時,要注意調整language level
* 1、File --> Project Stucture ,在Project Structure中分別在project和model模塊選擇項目設置Lanugage level 8
* 2、File --> Settings --> Compiler --> Java Compiler設置Project bytecode version;
* 同時修改項目對應的Target bytecode version,確保配置的JDK的版本是1.8及以上
*
* JDK8的Lamda expression使用起來和Scala的匿名函數非常相似
*/
?
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
?
// 在Java中JavaPairRDD和mapToPair配合使用,構造鍵值對,這裡使用了Scala的Tuple2數據類型
JavaPairRDD<String,Integer> wordpair = words.mapToPair(word -> new Tuple2<String,Integer>(word,1));
?
JavaPairRDD<String,Integer> count = wordpair.reduceByKey((x,y) -> x+y);
?
count.foreach(x -> System.out.println(x));

jsc.close();
}
}

scala版本

package com.spark.core
?
import org.apache.spark.{SparkConf, SparkContext}
?
object WordCountLocalScala {
def main(args: Array[String]): Unit = {
?
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(getClass.getSimpleName)
?
val sc = new SparkContext(conf)
?
val lines = sc.textFile("data/wc.txt")
?
val words = lines.flatMap(line => line.split(" "))
?
val pair = words.map(word => (word,1))
?
val count = pair.reduceByKey((x,y) => x+y)
?
count.foreach(println(_))
?
}
}

推薦閱讀:

相关文章