大數據去重本身很蛋疼,針對個別數據去重更是不可理喻但是spark的Structured Streaming就很容易能實現這個功能。聽浪尖給你娓娓道來~

數據從採集到最終處理結束是會存在一條數據在某一個點被重複接收處理的情況。如 kafka支持的是至少一次寫語義,也即是當寫數據到kafka的時候,有些記錄可能重複,例如如果消息已經被broker接收並寫入文件但是並沒有應答,這時生產者向kafka重發一個消息,就可能重複。由於kafka的至少一次的寫語義,structured streaming不能避免這種類型數據重複。所以一旦寫入成功,可以假設structured Streaming的查詢輸出是以至少一次語義寫入kafka的。一個可行去除重複記錄的解決方案是數據中引入一個primary(unique)key,這樣就可以在讀取數據的時候實行去重。

structured streaming是可以使用事件中的唯一標識符對數據流中的記錄進行重複數據刪除。這與使用唯一標識符列的靜態重複數據刪除完全相同。該查詢將存儲來自先前記錄的一定量的數據,以便可以過濾重複的記錄。與聚合類似,可以使用帶有或不帶有watermark 的重複數據刪除功能。

A),帶watermark:如果重複記錄可能到達的時間有上限,則可以在事件時間列上定義watermark,並使用guid和事件時間列進行重複數據刪除。

B),不帶watermark:由於重複記錄可能到達時間沒有界限,所以查詢將來自所有過去記錄的數據存儲為狀態。

源代碼,已測試通過~

package bigdata.spark.StructuredStreaming.KafkaSourceOperator

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object KafkaDropDuplicate {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/opt/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"
,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))

val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.config(sparkConf)
.getOrCreate()
import spark.implicits._

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","mt-mdh.local:9093")
.option("subscribe", "jsontest")
.load()
val words = df.selectExpr("CAST(value AS STRING)")

val fruit = words.select(
get_json_object($"value", "$.time").alias("timestamp").cast("long")
, get_json_object($"value", "$.fruit").alias("fruit"))

val fruitCast = fruit
.select(fruit("timestamp")
.cast("timestamp"),fruit("fruit"))
.withWatermark("timestamp", "10 Seconds")
.dropDuplicates("fruit")
.groupBy("fruit").count()

fruitCast.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.trigger(Trigger.ProcessingTime(5000))
.option("truncate","false")
.start()
.awaitTermination()
}
}

瘋狂的2019,歡迎和570位球友一起學習~

知識星球

推薦閱讀:
相关文章