Spark Streaming連接Kafka入門教程

來自專欄 Spark2 人贊了文章

我的原創地址:

Spark Streaming連接Kafka入門教程?

dongkelun.com
圖標

前言

首先要安裝好kafka,這裡不做kafka安裝的介紹(這裡用的是ambari安裝的kafka),若想了解如何安裝可參考Kafka安裝啟動入門教程和centos7 ambari2.6.1.5+hdp2.6.4.0 大數據集群安裝部署,本文是Spark Streaming入門教程,只是簡單的介紹如何利用spark 連接kafka,並消費數據,由於博主也是才學,所以其中代碼以實現為主,可能並不是最好的實現方式。

1、對應依賴

根據kafka版本選擇對應的依賴,我的kafka版本為0.10.1,spark版本2.2.1,然後在maven倉庫找到對應的依賴。 (Kafka項目在版本0.8和0.10之間引入了新的消費者API,因此有兩個獨立的相應Spark Streaming軟體包可用)

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version></dependency>

我用的是sbt,對應的依賴:

"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.1"

2、下載依賴

在命令行執行

sbt eclipse

(我用的是eclipse sbt,具體可看我的其他博客,具體命令根據自己的實際情況)

3、創建topic

創建測試用topic top1

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic top1

4、啟動程序

下好依賴之後,根據官方文檔提供的示例進行代碼測試 下面的代碼示例,主要實現spark 連接kafka,並將接收的數據列印出來,沒有實現複雜的功能。

package com.dkl.leanring.spark.kafkaimport org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafaDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("KafaDemo") //刷新時間設置為1秒 val ssc = new StreamingContext(conf, Seconds(1)) //消費者配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "10.180.29.180:6667", //kafka集群地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "group", //消費者組名 "auto.offset.reset" -> "latest", //latest自動重置偏移量為最新的偏移量 "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,則這個消費者的偏移量會在後台自動提交 val topics = Array("top1") //消費主題,可以同時消費多個 //創建DStream,返回接收到的輸入數據 val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) //列印獲取到的數據,因為1秒刷新一次,所以數據長度大於0時才列印 stream.foreachRDD(f => { if (f.count > 0) f.foreach(f => println(f.value())) }) ssc.start(); ssc.awaitTermination(); }}

啟動上面的程序(本地eclipse啟動即可) 其中當auto.offset.reset為latest時,直面上看只消費程序啟動後產生的數據,若想獲取歷史數據,將auto.offset.reset改為earliest即可,下面是具體的含義解釋,該解釋來自:blog.csdn.net/lishuangz

  • earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
  • latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
  • none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

需要記住的要點

當在本地運行一個 Spark Streaming 程序的時候,不要使用 "local" 或者 "local[1]" 作為 master 的 URL 。這兩種方法中的任何一個都意味著只有一個線程將用於運行本地任務。如果你正在使用一個基於接收器(receiver)的輸入離散流(input DStream)(例如, sockets ,Kafka ,Flume 等),則該單獨的線程將用於運行接收器(receiver),而沒有留下任何的線程用於處理接收到的數據。因此,在本地運行時,總是用 "local[n]" 作為 master URL ,其中的 n > 運行接收器的數量。 將邏輯擴展到集群上去運行,分配給 Spark Streaming 應用程序的內核(core)的內核數必須大於接收器(receiver)的數量。否則系統將接收數據,但是無法處理它。

我一開始沒有看到官網提醒的這一點,將示例中的local[2]改為local,現在已經在代碼里改回local[2]了,但是下面的截圖沒有替換,注意下。

5、發送消息

運行producer

bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic top1

注意:ambari.master.com為ip的映射名,可以直接寫ip,但是不能寫localhost,否則遠程spark獲取不到,如果為localhost,則需要修改配置,具體怎麼改,可參考Kafka安裝啟動入門教程中第七條。 然後依次發送下面幾個消息

hadoopsparkkafka中文測試

6、結果

然後在eclipse console就可以看到對應的數據了。

hadoopsparkkafka中文測試

為了直觀的展示和理解,附上截圖:

發送消息

結果


推薦閱讀:
查看原文 >>
相关文章