SparkStreaming實時寫入mysql或redis
需求:實時讀取終端字元,以空格作為分隔符,並計算字元的個數,寫入mysql或者redis。
1. 首先在終端輸入:nc -lk 8888
監聽8888埠輸入信息
2. 配置文件
IDEA項目中,在resources文件夾下創建application.conf,配置mysql的相關鏈接信息:
db.driver="com.mysql.jdbc.Driver"
db.url="jdbc:mysql://localhost:3306/testdb?characterEcoding=UTF-8"
db.user="root"
db.password="123456"
3. 新建Jpool類,封裝redis連接池的生成函數,便於使用:
import redis.clients.jedis.JedisPool
object Jpool {
private lazy val jedisPool = new JedisPool("localhost")
def getJedis = {
val jedis = jedisPool.getResource
jedis
}
}
4. 實時接收終端信息,並將wordcount結果寫入mysql,mysql的table表需要提前創建:
import java.sql.DriverManager
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingMysql {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
println(config.getString("db.url"))
val conf = new SparkConf().setMaster("local[*]").setAppName("Word Count and Mysql")
val ssc = new StreamingContext(conf, Seconds(2))
val lines = ssc.socketTextStream("localhost", 8888)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word,1))
val wordCount = pairs.reduceByKey(_ + _)
wordCount.print()
lines.foreachRDD(rdd => {
val current_batch_result = rdd.flatMap(_.split(" ")).map(w => (w,1)).reduceByKey(_ + _)
current_batch_result.foreachPartition(part => {
val url = config.getString("db.url")
val user = config.getString("db.user")
val password = config.getString("db.password")
val conn = DriverManager.getConnection(url, user, password)
part.foreach(tp => {
val pts = conn.prepareStatement("select * from wordcount where word = ?")
pts.setString(1, tp._1)
val rs = pts.executeQuery()
var flag = false
while(rs.next()){
flag = true
val dbCurrentCount = rs.getInt("total")
val newCount = dbCurrentCount + tp._2
val ptu = conn.prepareStatement("update wordcount set total = ? where word = ?")
ptu.setInt(1, newCount)
ptu.setString(2, tp._1)
ptu.executeUpdate()
ptu.close()
}
rs.close()
pts.close()
if(!flag){
val pti = conn.prepareStatement("insert into wordcount values(?,?)")
pti.setString(1, tp._1)
pti.setInt(2, tp._2)
pti.executeUpdate()
pti.close()
}
})
if(null != conn) conn.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
5. 實時接收終端信息,並將wordcount結果寫入redis:
import com.study.utils.Jpool
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf}
object StreamingRedis {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
val conf = new SparkConf().setMaster("local[*]").setAppName("Word count and Redis")
val ssc = new StreamingContext(conf, Seconds(2))
val streams = ssc.socketTextStream("localhost", 8888)
streams.foreachRDD(rdd => {
val current_batch_result = rdd.flatMap(_.split(" ")).map(w => (w,1)).reduceByKey(_+_)
current_batch_result.foreachPartition(part => {
val jedis = Jpool.getJedis
part.foreach(tp => {
jedis.hincrBy("wordcount", tp._1, tp._2)
})
jedis.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
推薦閱讀: