目錄

天小天:(一)Spark Streaming 運算元梳理 — 簡單介紹streaming運行邏輯天小天:(二)Spark Streaming 運算元梳理 — flatMap和mapPartitions

天小天:(三)Spark Streaming 運算元梳理 — transform運算元

天小天:(四)Spark Streaming 運算元梳理 — Kafka createDirectStream

前言

本文主要介紹KafkaUtils.createDirectStream的實現過程,包括實現的結構及如何消費kafka數據。

例子

object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics) = args

// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

這裡的例子是Spark源碼example中的例子,主要實現的是拉取Kafka數據,並計算work count的過程。

入參解釋

入參為

  • ssc: StreamingContext: Streaming上下文
  • locationStrategy:LocationStrategy:Kafka消費者的分布策略
  • consumerStrategy: ConsumerStrategy[K, V]:Kafka配置

一共三個參數,第一個和第三個參數比較好理解,只要使用過應該沒什麼問題。第二個參數可能不是很理解,這裡會詳細講解下LocationStrategy.

下面看下LocationStrategy類的源碼:

object LocationStrategies {
/**
* :: Experimental ::
* Use this only if your executors are on the same nodes as your Kafka brokers.
*/
@Experimental
def PreferBrokers: LocationStrategy =
org.apache.spark.streaming.kafka010.PreferBrokers

/**
* :: Experimental ::
* Use this in most cases, it will consistently distribute partitions across all executors.
*/
@Experimental
def PreferConsistent: LocationStrategy =
org.apache.spark.streaming.kafka010.PreferConsistent

/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))

/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(hostMap)
}

這裡一共提供了三種位置策略,策略名和使用時機分別為:

  • PreferBrokers:當Spark集群和Kafka集群屬於同一組機器時使用;
  • PreferConsistent:最常用的策略,當Spark機器和Kafka機器不屬於同一組機器時使用;
  • PreferFixed:當數據分布不均衡,由用戶自行制定KafkaPartition和機器的關係。

從源碼中可以看到PreferFixed有兩個方法,區別只是一個入參是Java的Map,一個是Scala的Map,其他實現沒有區別。

總體邏輯

這部分,我們從整體看下createDirectStream是如何生成RDD並且消費Kafka消息的。

KafkaUtils邏輯

這裡我們調用的是createDirectStream方法,也就是用戶調用的方法。具體代碼如下:

def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
}

入參中PerPartitionConfig是每個Partition的配置,默認的分區配置僅包含每個分區的消費速率,也就是反壓的配置。

實現邏輯就很簡單,只有實例化DStream的一個邏輯。

DirectKafkaInputDStream邏輯

這裡我們只講compute方法的邏輯,首先我們看下代碼。

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
// 獲取本批次消費截止的Offset [1]
val untilOffsets = clamp(latestOffsets())
// 構造包含topic、partition、開始及結束offset類的集合
val offsetRanges = untilOffsets.map { case (tp, uo) => // tp:代表topic partition,uo代表:until offset的簡寫,意為本批次消費到哪個offset
val fo = currentOffsets(tp) // from offset,即當前offset
OffsetRange(tp.topic, tp.partition, fo, uo)
}

// 是否使用消費者緩存,這裡的邏輯會在講KafkaRDDIterator時說到。
val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
true)

// 生成kafkaRDD
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
getPreferredHosts, useConsumerCache)

// Report the record number and metadata of this batch interval to InputInfoTracker.
// 本批次的一些信息,與業務處理關係不大,我們暫時略過
val description = offsetRanges.filter { offsetRange =>
// Dont display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic} partition: ${offsetRange.partition} " +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("
"
)
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

// 更新當前消費截止Offset,下個批次的from offset就是從這裡取
currentOffsets = untilOffsets
// 提交offset,具體實現接下來會說到 [2]
commitAll()
// 返回RDD
Some(rdd)
}

上面的代碼和注釋解釋了compute邏輯,有兩個地方需要補充。

  1. val untilOffsets = clamp(latestOffsets()):方法的第一個邏輯,也是這裡最複雜的邏輯,裡面涉及到反壓的部分邏輯。這裡不會展開介紹,如果想知道具體的原理,可以參考:Spark-Streaming反壓(back-pressure)
  2. commitAll():這裡提交Offset的前提是用戶在代碼邏輯中調用了commitAsync方法才會提交,否則這裡不會做任何事情。涉及到的方法及代碼及邏輯注釋如下:

/**
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
*/
def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
commitAsync(offsetRanges, null)
}

/**
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
* @param callback Only the most recently provided callback will be used at commit.
*/
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
commitCallback.set(callback)
// 把要提交Offset的offsetRanges寫入到commitQueue中
commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
}

protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
// 獲取隊列中第一個OffsetRange,如果用戶沒有調用commitAsync方法,則隊列為空,之後的邏輯就會跳過,實際上不執行任何提交Offset的操作
var osr = commitQueue.poll()
while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
// 把topic及對應的offset寫入到m中
m.put(tp, new OffsetAndMetadata(offset))
osr = commitQueue.poll()
}
if (!m.isEmpty) {
// 如果m不為空,則提交offset
consumer.commitAsync(m, commitCallback.get)
}
}

KafkaRDD邏輯

同樣我們這裡只關心compute方法邏輯,具體代碼及相應的注釋如下:

override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
// 獲取Partition信息,part中主要包含topic、partition、當前批次開始offset、當前批次結束offset
val part = thePart.asInstanceOf[KafkaRDDPartition]
// 檢驗開始offset是否小於等於結束offset
require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
// 開始offset和結束offset是否一致。如果一致返回空的迭代器。
if (part.fromOffset == part.untilOffset) {
logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
// 通過讀取配置,判斷這個topic是否為壓縮的topic。
// 如果是則返回CompactedKafkaRDDIterator迭代器,如果不是則返回KafkaRDDIterator迭代器。
if (compacted) {
new CompactedKafkaRDDIterator[K, V](
part,
context,
kafkaParams,
useConsumerCache,
pollTimeout,
cacheInitialCapacity,
cacheMaxCapacity,
cacheLoadFactor
)
} else {
new KafkaRDDIterator[K, V](
part,
context,
kafkaParams,
useConsumerCache,
pollTimeout,
cacheInitialCapacity,
cacheMaxCapacity,
cacheLoadFactor
)
}
}
}
}

KafkaRDDIterator邏輯

我們先看KafkaRDDIterator的邏輯,因為KafkaRDDIteratorCompactedKafkaRDDIterator有相似處,並且我們經常使用的也是這個迭代器。

我們可以先簡單看下代碼結構,之後會對代碼詳細講解。

/**
* An iterator that fetches messages directly from Kafka for the offsets in partition.
* Uses a cached consumer where possible to take advantage of prefetching
*/
private class KafkaRDDIterator[K, V](
part: KafkaRDDPartition, // partition信息
context: TaskContext, // 上下文
kafkaParams: ju.Map[String, Object], // kafka的配置信息
useConsumerCache: Boolean, // 是否緩存kafka的consumer
pollTimeout: Long, // poll的超時時間
cacheInitialCapacity: Int, // map 初始容量
cacheMaxCapacity: Int, // map 最大容量
cacheLoadFactor: Float // map 負載因子
) extends Iterator[ConsumerRecord[K, V]] {
// 獲取消費者group id
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

context.addTaskCompletionListener(_ => closeIfNeeded())
// 獲取consumer,如果是使用緩存consumer則從緩存中取,如果未取到則創建新的consumer。
//詳細的解釋會在代碼段下面說明。
val consumer = if (useConsumerCache) {
CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
if (context.attemptNumber >= 1) {
// just in case the prior attempt failures were cache related
CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
}
CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
} else {
CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
}

// 向kafka請求的offset,初始化為開始offset
var requestOffset = part.fromOffset

def closeIfNeeded(): Unit = {
if (!useConsumerCache && consumer != null) {
consumer.close()
}
}

// 判斷是否還有下一個元素。
// 通過判斷請求的offset是否小於截止的offset,如果小於說明還有下一個元素,如果不小於則沒有下一個元素。
override def hasNext(): Boolean = requestOffset < part.untilOffset

// 獲取下一個元素,其中主要的邏輯是get方法獲取下一個記錄,取到之後把請求offset加一,並返回該條記錄。
//其中get方法會在代碼段候詳細說到。
override def next(): ConsumerRecord[K, V] = {
if (!hasNext) {
throw new ju.NoSuchElementException("Cant call getNext() once untilOffset has been reached")
}
val r = consumer.get(requestOffset, pollTimeout)
requestOffset += 1
r
}
}

這裡先說下入參useConsumerCachecacheInitialCapacitycacheMaxCapacitycacheLoadFactor這四個參數,這四個參數涉及到是否使用緩存consumer及緩存consumer的一些邏輯。

在實例化KafkaRDDIterator時就會執行初始化consumer的邏輯。這裡的consumer對應的類是CachedKafkaConsumer而並不是kafka的consumer。kafka的consumer是在CachedKafkaConsumer類中封裝的。這裡是需要注意的,否則之後可能混淆。我們把consumer初始化的邏輯單獨貼出來解釋。

val consumer = if (useConsumerCache) { // 使用緩存consumer的邏輯
// 初始化CachedKafkaConsumer,init的邏輯一會說。
CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
if (context.attemptNumber >= 1) {
// just in case the prior attempt failures were cache related
// 防止之前的失敗和緩存有關,所以在這裡清除對應緩存
CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
}
// 獲取consumer,get的邏輯也會單獨講。
CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
} else {
// 不使用consumer緩存的方式實例化consumer
CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
}

  • CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) 涉及到的邏輯:

// Dont want to depend on guava, dont want a cleanup thread, use a simple LinkedHashMap
// 相當於一個static方法,在內存中只緩存一個cache實例。
private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null

/** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
def init(
initialCapacity: Int,
maxCapacity: Int,
loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { // 線程鎖
// 如果cache還沒有被初始化,則通過下面邏輯初始化。
// 從這裡我們可以看出,這個init方法就是初始化cache。
if (null == cache) {
logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
// 實例化LinkedHashMap
cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
initialCapacity, loadFactor, true) {
// 重寫removeEldestEntry方法,保證緩存的consumer不會超出設置的最大緩存容量。
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
// 如果Map大小大於設置的最大容量,則斷開最近最少使用的consumer連接,並刪除最近最少使用的consumer。
// 至於為什麼這個entry就是就是最近最少使用的,並且返回true就是刪除,可以自行學習下LinkedHashMap實現原理。
if (this.size > maxCapacity) {
try {
entry.getValue.consumer.close()
} catch {
case x: KafkaException =>
logError("Error closing oldest Kafka consumer", x)
}
true
} else {
false
}
}
}
}
}

上面的邏輯就是初始化cache的邏輯。其中cache用到了LinkedHashMap是因為這個類可以控制緩存consumer的個數。至於為什麼要控制consumer的緩存個數,筆者認為是由於consumer不只含有kafka的consumer的連接,還有一次poll的數據,所以為了控制cache佔用內存的大小所以設置了這個邏輯。

  • CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)邏輯:

/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesnt already exist, will be created using kafkaParams.
*/
def get[K, V](
groupId: String,
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
CachedKafkaConsumer.synchronized {// 線程鎖
// 以groupID,topic,partition作為key
val k = CacheKey(groupId, topic, partition)
// 從緩存中取consuemr
val v = cache.get(k)
if (null == v) {
// 當key沒有對應的consumer時,實例化consumer,並且寫入緩存中。
logInfo(s"Cache miss for $k")
logDebug(cache.keySet.toString)
val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
cache.put(k, c)
c
} else {
// any given topicpartition should have a consistent key and value type
// 返回consuemr
v.asInstanceOf[CachedKafkaConsumer[K, V]]
}
}

  • CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)邏輯邏輯:

/**
* Get a fresh new instance, unassociated with the global cache.
* Caller is responsible for closing
*/
def getUncached[K, V](
groupId: String,
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
// 實例化consumer,沒有緩存的邏輯
new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)

至此初始化consumer的邏輯介紹完了。接下來我們看下next方法中的consumer.get(requestOffset, pollTimeout)實現,其中需要說明的是這裡的consumer對象和上文提到的consumer對象是同一個對象,都是CachedKafkaConsumer類:

// consumer 緩存的kafka數據,避免每次查詢都需要請求kafka
protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
// 下一個offset,初始化為-2
protected var nextOffset = -2L
/**
* Get the record for the given offset, waiting up to timeout ms if IO is necessary.
* Sequential forward access will use buffers, but random access will be horribly inefficient.
*/
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
// 判斷傳入的offset和consumer對象中記錄的offset是否相同。
// 這裡不相同的原因是nextOffset初始化為-2,必然會不相等。
if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset")
// 消費到的offset定位到指定的offset,保證poll到的數據是從指定的offset開始的。
seek(offset)
// 拉取數據,更新buffer。具體邏輯見代碼段中poll方法。
poll(timeout)
}

if (!buffer.hasNext()) { poll(timeout) }
require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
// 防止緩存中數據的offset和期望數據的offset不一致,所以作一個判斷。
// 如果不一致這重新定位,並更新buffer,重新獲取record。
if (record.offset != offset) {
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
record = buffer.next()
require(record.offset == offset,
s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
"spark.streaming.kafka.allowNonConsecutiveOffsets"
)
}

nextOffset = offset + 1
record
}

// 定位offser,此方法的consumer對象是kafka的consumer。
private def seek(offset: Long): Unit = {
logDebug(s"Seeking to $topicPartition $offset")
consumer.seek(topicPartition, offset)
}

// 從kafka拉取對應topic,partition的數據,並更新到buffer中。
private def poll(timeout: Long): Unit = {
// 拉取kafka中的數據
val p = consumer.poll(timeout)
// 過濾出指定topic和partition的數據
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()} ${r.size}")
// 數據更新到buffer中。
buffer = r.listIterator
}

到這裡,KafkaRDDIterator的代碼邏輯基本講完了。這裡關於緩存的邏輯會比較繞。這裡有一張圖解釋了bache與bache之間consumer的共享,及next與next之間buffer的共享關係。

CompactedKafkaRDDIterator邏輯

當消費的kafka topic的刪除邏輯是compact時才需要用到CompactedKafkaRDDIterator迭代器。並且要使用此迭代器需要把spark的spark.streaming.kafka.allowNonConsecutiveOffsets參數設置為true,此參數默認是false,默認用KafkaRDDIterator

先來看下此迭代器的代碼:

private class CompactedKafkaRDDIterator[K, V](
part: KafkaRDDPartition,
context: TaskContext,
kafkaParams: ju.Map[String, Object],
useConsumerCache: Boolean,
pollTimeout: Long,
cacheInitialCapacity: Int,
cacheMaxCapacity: Int,
cacheLoadFactor: Float
) extends KafkaRDDIterator[K, V](
part,
context,
kafkaParams,
useConsumerCache,
pollTimeout,
cacheInitialCapacity,
cacheMaxCapacity,
cacheLoadFactor
) {

consumer.compactedStart(part.fromOffset, pollTimeout)
// 在初始時直接獲取下一個record
private var nextRecord = consumer.compactedNext(pollTimeout)

private var okNext: Boolean = true
// 利用okNext決定是否還有下一個元素,初始為true
override def hasNext(): Boolean = okNext

override def next(): ConsumerRecord[K, V] = {
if (!hasNext) {
throw new ju.NoSuchElementException("Cant call getNext() once untilOffset has been reached")
}
// 獲取下一個record
val r = nextRecord

if (r.offset + 1 >= part.untilOffset) {
// 判斷record的下一個offset大於等於截止offset,
// 如果是的話就沒有下一個元素,將okNext置為false
okNext = false
} else {
// 更新下一個record,下一次執行next方法時可以省略獲取nextRecord的時間,
// 並且也可以提前知道是否還有下一個record。
nextRecord = consumer.compactedNext(pollTimeout)
if (nextRecord.offset >= part.untilOffset) {
// 如果nextRecord的offset大於截止offset,則把oKNext置為false,意為hasNext為false
okNext = false
// 撤銷本次的compactedNext邏輯。
consumer.compactedPrevious()
}
}
r
}
}

從上面可以看出,此類是繼承的KafkaRDDIterator,所以是否用consuemr緩存的邏輯是一樣的。這裡就不會再說一次了。這裡最主要邏輯是在consumer.compactedNext(pollTimeout)方法中,接下來我們看下此方法的實現:

/**
* Get the next record in the batch from a compacted topic.
* Assumes compactedStart has been called first, and ignores gaps.
*/
def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
if (!buffer.hasNext()) {
// 如果buffer沒有next,則消費kafka數據,更新buffer。
poll(timeout)
}
require(buffer.hasNext(),
s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout")
// 獲取buffer的下一個數據。
val record = buffer.next()
// offset + 1
nextOffset = record.offset + 1
record
}

從上面可以看出compactedNext的實現邏輯相對於KafkaRDDIterator中的get方法實現邏輯簡單很多。沒有對buffer獲取到的record的offset與nextOffset檢驗,直接就認為取到的數據是正確的。這樣做的原因是因為topic設置為compact刪除之後,offset就是不連續的了,因此無法判斷record的offset是否合理。

總結

至此,KafkaUtils.createDirectStream的邏輯就講解完了。核心思想和其他的運算元是一樣的,就是先實例化DStream,然後DStream實例化RDD,之後RDD實例化Iterator,最終通過Iterator的next方法來獲取數據。


推薦閱讀:
相关文章