需求:實時讀取終端字元,以空格作為分隔符,並計算字元的個數,寫入mysql或者redis。

1. 首先在終端輸入:nc -lk 8888

監聽8888埠輸入信息

2. 配置文件

IDEA項目中,在resources文件夾下創建application.conf,配置mysql的相關鏈接信息:

db.driver="com.mysql.jdbc.Driver"
db.url="jdbc:mysql://localhost:3306/testdb?characterEcoding=UTF-8"
db.user="root"
db.password="123456"

3. 新建Jpool類,封裝redis連接池的生成函數,便於使用:

import redis.clients.jedis.JedisPool

object Jpool {
private lazy val jedisPool = new JedisPool("localhost")
def getJedis = {

val jedis = jedisPool.getResource
jedis
}

}

4. 實時接收終端信息,並將wordcount結果寫入mysql,mysql的table表需要提前創建:

import java.sql.DriverManager

import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingMysql {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()

println(config.getString("db.url"))
val conf = new SparkConf().setMaster("local[*]").setAppName("Word Count and Mysql")
val ssc = new StreamingContext(conf, Seconds(2))

val lines = ssc.socketTextStream("localhost", 8888)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word,1))

val wordCount = pairs.reduceByKey(_ + _)

wordCount.print()

lines.foreachRDD(rdd => {
val current_batch_result = rdd.flatMap(_.split(" ")).map(w => (w,1)).reduceByKey(_ + _)

current_batch_result.foreachPartition(part => {
val url = config.getString("db.url")
val user = config.getString("db.user")
val password = config.getString("db.password")
val conn = DriverManager.getConnection(url, user, password)

part.foreach(tp => {
val pts = conn.prepareStatement("select * from wordcount where word = ?")
pts.setString(1, tp._1)
val rs = pts.executeQuery()

var flag = false

while(rs.next()){
flag = true
val dbCurrentCount = rs.getInt("total")
val newCount = dbCurrentCount + tp._2
val ptu = conn.prepareStatement("update wordcount set total = ? where word = ?")
ptu.setInt(1, newCount)
ptu.setString(2, tp._1)
ptu.executeUpdate()

ptu.close()
}

rs.close()
pts.close()

if(!flag){
val pti = conn.prepareStatement("insert into wordcount values(?,?)")
pti.setString(1, tp._1)
pti.setInt(2, tp._2)
pti.executeUpdate()

pti.close()
}

})

if(null != conn) conn.close()

})
})

ssc.start()
ssc.awaitTermination()

}

}

5. 實時接收終端信息,並將wordcount結果寫入redis:

import com.study.utils.Jpool
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf}

object StreamingRedis {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()

val conf = new SparkConf().setMaster("local[*]").setAppName("Word count and Redis")
val ssc = new StreamingContext(conf, Seconds(2))

val streams = ssc.socketTextStream("localhost", 8888)

streams.foreachRDD(rdd => {
val current_batch_result = rdd.flatMap(_.split(" ")).map(w => (w,1)).reduceByKey(_+_)
current_batch_result.foreachPartition(part => {
val jedis = Jpool.getJedis
part.foreach(tp => {
jedis.hincrBy("wordcount", tp._1, tp._2)
})
jedis.close()
})
})

ssc.start()
ssc.awaitTermination()

}

}

推薦閱讀:

相关文章