KafkaUtils邏輯
這裡我們調用的是createDirectStream
方法,也就是用戶調用的方法。具體代碼如下:
def createDirectStream [ K , V ](
ssc : StreamingContext ,
locationStrategy : LocationStrategy ,
consumerStrategy : ConsumerStrategy [ K , V ],
perPartitionConfig : PerPartitionConfig
) : InputDStream [ ConsumerRecord [ K , V ]] = {
new DirectKafkaInputDStream [ K , V ]( ssc , locationStrategy , consumerStrategy , perPartitionConfig )
}
入參中PerPartitionConfig
是每個Partition
的配置,默認的分區配置僅包含每個分區的消費速率,也就是反壓的配置。
實現邏輯就很簡單,只有實例化DStream的一個邏輯。
DirectKafkaInputDStream邏輯
這裡我們只講compute
方法的邏輯,首先我們看下代碼。
override def compute ( validTime : Time ) : Option [ KafkaRDD [ K , V ]] = {
// 獲取本批次消費截止的Offset [1]
val untilOffsets = clamp ( latestOffsets ())
// 構造包含topic、partition、開始及結束offset類的集合
val offsetRanges = untilOffsets . map { case ( tp , uo ) => // tp:代表topic partition,uo代表:until offset的簡寫,意為本批次消費到哪個offset
val fo = currentOffsets ( tp ) // from offset,即當前offset
OffsetRange ( tp . topic , tp . partition , fo , uo )
}
// 是否使用消費者緩存,這裡的邏輯會在講KafkaRDDIterator時說到。
val useConsumerCache = context . conf . getBoolean ( "spark.streaming.kafka.consumer.cache.enabled" ,
true )
// 生成kafkaRDD
val rdd = new KafkaRDD [ K , V ]( context . sparkContext , executorKafkaParams , offsetRanges . toArray ,
getPreferredHosts , useConsumerCache )
// Report the record number and metadata of this batch interval to InputInfoTracker.
// 本批次的一些信息,與業務處理關係不大,我們暫時略過
val description = offsetRanges . filter { offsetRange =>
// Dont display empty ranges.
offsetRange . fromOffset != offsetRange . untilOffset
}. map { offsetRange =>
s"topic: ${ offsetRange . topic } partition: ${ offsetRange . partition } " +
s"offsets: ${ offsetRange . fromOffset } to ${ offsetRange . untilOffset } "
}. mkString ( "
" )
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map (
"offsets" -> offsetRanges . toList ,
StreamInputInfo . METADATA_KEY_DESCRIPTION -> description )
val inputInfo = StreamInputInfo ( id , rdd . count , metadata )
ssc . scheduler . inputInfoTracker . reportInfo ( validTime , inputInfo )
// 更新當前消費截止Offset,下個批次的from offset就是從這裡取
currentOffsets = untilOffsets
// 提交offset,具體實現接下來會說到 [2]
commitAll ()
// 返回RDD
Some ( rdd )
}
上面的代碼和注釋解釋了compute
邏輯,有兩個地方需要補充。
val untilOffsets = clamp(latestOffsets())
:方法的第一個邏輯,也是這裡最複雜的邏輯,裡面涉及到反壓的部分邏輯。這裡不會展開介紹,如果想知道具體的原理,可以參考:Spark-Streaming反壓(back-pressure)
commitAll()
:這裡提交Offset的前提是用戶在代碼邏輯中調用了commitAsync
方法才會提交,否則這裡不會做任何事情。涉及到的方法及代碼及邏輯注釋如下:
/**
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
*/
def commitAsync ( offsetRanges : Array [ OffsetRange ]) : Unit = {
commitAsync ( offsetRanges , null )
}
/**
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
* @param callback Only the most recently provided callback will be used at commit.
*/
def commitAsync ( offsetRanges : Array [ OffsetRange ], callback : OffsetCommitCallback ) : Unit = {
commitCallback . set ( callback )
// 把要提交Offset的offsetRanges寫入到commitQueue中
commitQueue . addAll ( ju . Arrays . asList ( offsetRanges : _ * ))
}
protected def commitAll () : Unit = {
val m = new ju . HashMap [ TopicPartition , OffsetAndMetadata ]()
// 獲取隊列中第一個OffsetRange,如果用戶沒有調用commitAsync方法,則隊列為空,之後的邏輯就會跳過,實際上不執行任何提交Offset的操作
var osr = commitQueue . poll ()
while ( null != osr ) {
val tp = osr . topicPartition
val x = m . get ( tp )
val offset = if ( null == x ) { osr . untilOffset } else { Math . max ( x . offset , osr . untilOffset ) }
// 把topic及對應的offset寫入到m中
m . put ( tp , new OffsetAndMetadata ( offset ))
osr = commitQueue . poll ()
}
if (! m . isEmpty ) {
// 如果m不為空,則提交offset
consumer . commitAsync ( m , commitCallback . get )
}
}
KafkaRDD邏輯
同樣我們這裡只關心compute
方法邏輯,具體代碼及相應的注釋如下:
override def compute ( thePart : Partition , context : TaskContext ) : Iterator [ ConsumerRecord [ K , V ]] = {
// 獲取Partition信息,part中主要包含topic、partition、當前批次開始offset、當前批次結束offset
val part = thePart . asInstanceOf [ KafkaRDDPartition ]
// 檢驗開始offset是否小於等於結束offset
require ( part . fromOffset <= part . untilOffset , errBeginAfterEnd ( part ))
// 開始offset和結束offset是否一致。如果一致返回空的迭代器。
if ( part . fromOffset == part . untilOffset ) {
logInfo ( s"Beginning offset ${ part . fromOffset } is the same as ending offset " +
s"skipping ${ part . topic } ${ part . partition } " )
Iterator . empty
} else {
logInfo ( s"Computing topic ${ part . topic } , partition ${ part . partition } " +
s"offsets ${ part . fromOffset } -> ${ part . untilOffset } " )
// 通過讀取配置,判斷這個topic是否為壓縮的topic。
// 如果是則返回CompactedKafkaRDDIterator迭代器,如果不是則返回KafkaRDDIterator迭代器。
if ( compacted ) {
new CompactedKafkaRDDIterator [ K , V ](
part ,
context ,
kafkaParams ,
useConsumerCache ,
pollTimeout ,
cacheInitialCapacity ,
cacheMaxCapacity ,
cacheLoadFactor
)
} else {
new KafkaRDDIterator [ K , V ](
part ,
context ,
kafkaParams ,
useConsumerCache ,
pollTimeout ,
cacheInitialCapacity ,
cacheMaxCapacity ,
cacheLoadFactor
)
}
}
}
}
KafkaRDDIterator邏輯
我們先看KafkaRDDIterator
的邏輯,因為KafkaRDDIterator
和CompactedKafkaRDDIterator
有相似處,並且我們經常使用的也是這個迭代器。
我們可以先簡單看下代碼結構,之後會對代碼詳細講解。
/**
* An iterator that fetches messages directly from Kafka for the offsets in partition.
* Uses a cached consumer where possible to take advantage of prefetching
*/
private class KafkaRDDIterator [ K , V ](
part : KafkaRDDPartition , // partition信息
context : TaskContext , // 上下文
kafkaParams : ju.Map [ String , Object ], // kafka的配置信息
useConsumerCache : Boolean , // 是否緩存kafka的consumer
pollTimeout : Long , // poll的超時時間
cacheInitialCapacity : Int , // map 初始容量
cacheMaxCapacity : Int , // map 最大容量
cacheLoadFactor : Float // map 負載因子
) extends Iterator [ ConsumerRecord [ K , V ]] {
// 獲取消費者group id
val groupId = kafkaParams . get ( ConsumerConfig . GROUP_ID_CONFIG ). asInstanceOf [ String ]
context . addTaskCompletionListener ( _ => closeIfNeeded ())
// 獲取consumer,如果是使用緩存consumer則從緩存中取,如果未取到則創建新的consumer。
//詳細的解釋會在代碼段下面說明。
val consumer = if ( useConsumerCache ) {
CachedKafkaConsumer . init ( cacheInitialCapacity , cacheMaxCapacity , cacheLoadFactor )
if ( context . attemptNumber >= 1 ) {
// just in case the prior attempt failures were cache related
CachedKafkaConsumer . remove ( groupId , part . topic , part . partition )
}
CachedKafkaConsumer . get [ K , V ]( groupId , part . topic , part . partition , kafkaParams )
} else {
CachedKafkaConsumer . getUncached [ K , V ]( groupId , part . topic , part . partition , kafkaParams )
}
// 向kafka請求的offset,初始化為開始offset
var requestOffset = part . fromOffset
def closeIfNeeded () : Unit = {
if (! useConsumerCache && consumer != null ) {
consumer . close ()
}
}
// 判斷是否還有下一個元素。
// 通過判斷請求的offset是否小於截止的offset,如果小於說明還有下一個元素,如果不小於則沒有下一個元素。
override def hasNext () : Boolean = requestOffset < part . untilOffset
// 獲取下一個元素,其中主要的邏輯是get方法獲取下一個記錄,取到之後把請求offset加一,並返回該條記錄。
//其中get方法會在代碼段候詳細說到。
override def next () : ConsumerRecord [ K , V ] = {
if (! hasNext ) {
throw new ju . NoSuchElementException ( "Cant call getNext() once untilOffset has been reached" )
}
val r = consumer . get ( requestOffset , pollTimeout )
requestOffset += 1
r
}
}
這裡先說下入參useConsumerCache
、cacheInitialCapacity
、cacheMaxCapacity
和cacheLoadFactor
這四個參數,這四個參數涉及到是否使用緩存consumer及緩存consumer的一些邏輯。
在實例化KafkaRDDIterator
時就會執行初始化consumer
的邏輯。這裡的consumer
對應的類是CachedKafkaConsumer
而並不是kafka的consumer。kafka的consumer是在CachedKafkaConsumer
類中封裝的。這裡是需要注意的,否則之後可能混淆。我們把consumer
初始化的邏輯單獨貼出來解釋。
val consumer = if ( useConsumerCache ) { // 使用緩存consumer的邏輯
// 初始化CachedKafkaConsumer,init的邏輯一會說。
CachedKafkaConsumer . init ( cacheInitialCapacity , cacheMaxCapacity , cacheLoadFactor )
if ( context . attemptNumber >= 1 ) {
// just in case the prior attempt failures were cache related
// 防止之前的失敗和緩存有關,所以在這裡清除對應緩存
CachedKafkaConsumer . remove ( groupId , part . topic , part . partition )
}
// 獲取consumer,get的邏輯也會單獨講。
CachedKafkaConsumer . get [ K , V ]( groupId , part . topic , part . partition , kafkaParams )
} else {
// 不使用consumer緩存的方式實例化consumer
CachedKafkaConsumer . getUncached [ K , V ]( groupId , part . topic , part . partition , kafkaParams )
}
CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
涉及到的邏輯:
// Dont want to depend on guava, dont want a cleanup thread, use a simple LinkedHashMap
// 相當於一個static方法,在內存中只緩存一個cache實例。
private var cache : ju.LinkedHashMap [ CacheKey , CachedKafkaConsumer [ _ , _ ]] = null
/** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
def init (
initialCapacity : Int ,
maxCapacity : Int ,
loadFactor : Float ) : Unit = CachedKafkaConsumer . synchronized { // 線程鎖
// 如果cache還沒有被初始化,則通過下面邏輯初始化。
// 從這裡我們可以看出,這個init方法就是初始化cache。
if ( null == cache ) {
logInfo ( s"Initializing cache $initialCapacity $maxCapacity $loadFactor " )
// 實例化LinkedHashMap
cache = new ju . LinkedHashMap [ CacheKey , CachedKafkaConsumer [ _ , _ ]](
initialCapacity , loadFactor , true ) {
// 重寫removeEldestEntry方法,保證緩存的consumer不會超出設置的最大緩存容量。
override def removeEldestEntry (
entry : ju.Map.Entry [ CacheKey , CachedKafkaConsumer [ _ , _ ]]) : Boolean = {
// 如果Map大小大於設置的最大容量,則斷開最近最少使用的consumer連接,並刪除最近最少使用的consumer。
// 至於為什麼這個entry就是就是最近最少使用的,並且返回true就是刪除,可以自行學習下LinkedHashMap實現原理。
if ( this . size > maxCapacity ) {
try {
entry . getValue . consumer . close ()
} catch {
case x : KafkaException =>
logError ( "Error closing oldest Kafka consumer" , x )
}
true
} else {
false
}
}
}
}
}
上面的邏輯就是初始化cache的邏輯。其中cache用到了LinkedHashMap是因為這個類可以控制緩存consumer的個數。至於為什麼要控制consumer的緩存個數,筆者認為是由於consumer不只含有kafka的consumer的連接,還有一次poll的數據,所以為了控制cache佔用內存的大小所以設置了這個邏輯。
CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
邏輯:
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesnt already exist, will be created using kafkaParams.
*/
def get [ K , V ](
groupId : String ,
topic : String ,
partition : Int ,
kafkaParams : ju.Map [ String , Object ]) : CachedKafkaConsumer [ K , V ] =
CachedKafkaConsumer . synchronized { // 線程鎖
// 以groupID,topic,partition作為key
val k = CacheKey ( groupId , topic , partition )
// 從緩存中取consuemr
val v = cache . get ( k )
if ( null == v ) {
// 當key沒有對應的consumer時,實例化consumer,並且寫入緩存中。
logInfo ( s"Cache miss for $k " )
logDebug ( cache . keySet . toString )
val c = new CachedKafkaConsumer [ K , V ]( groupId , topic , partition , kafkaParams )
cache . put ( k , c )
c
} else {
// any given topicpartition should have a consistent key and value type
// 返回consuemr
v . asInstanceOf [ CachedKafkaConsumer [ K , V ]]
}
}
CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
邏輯邏輯:
/**
* Get a fresh new instance, unassociated with the global cache.
* Caller is responsible for closing
*/
def getUncached [ K , V ](
groupId : String ,
topic : String ,
partition : Int ,
kafkaParams : ju.Map [ String , Object ]) : CachedKafkaConsumer [ K , V ] =
// 實例化consumer,沒有緩存的邏輯
new CachedKafkaConsumer [ K , V ]( groupId , topic , partition , kafkaParams )
至此初始化consumer
的邏輯介紹完了。接下來我們看下next
方法中的consumer.get(requestOffset, pollTimeout)
實現,其中需要說明的是這裡的consumer
對象和上文提到的consumer
對象是同一個對象,都是CachedKafkaConsumer
類:
// consumer 緩存的kafka數據,避免每次查詢都需要請求kafka
protected var buffer = ju . Collections . emptyListIterator [ ConsumerRecord [ K , V ]]()
// 下一個offset,初始化為-2
protected var nextOffset = - 2L
/**
* Get the record for the given offset, waiting up to timeout ms if IO is necessary.
* Sequential forward access will use buffers, but random access will be horribly inefficient.
*/
def get ( offset : Long , timeout : Long ) : ConsumerRecord [ K , V ] = {
logDebug ( s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset " )
// 判斷傳入的offset和consumer對象中記錄的offset是否相同。
// 這裡不相同的原因是nextOffset初始化為-2,必然會不相等。
if ( offset != nextOffset ) {
logInfo ( s"Initial fetch for $groupId $topic $partition $offset " )
// 消費到的offset定位到指定的offset,保證poll到的數據是從指定的offset開始的。
seek ( offset )
// 拉取數據,更新buffer。具體邏輯見代碼段中poll方法。
poll ( timeout )
}
if (! buffer . hasNext ()) { poll ( timeout ) }
require ( buffer . hasNext (),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout " )
var record = buffer . next ()
// 防止緩存中數據的offset和期望數據的offset不一致,所以作一個判斷。
// 如果不一致這重新定位,並更新buffer,重新獲取record。
if ( record . offset != offset ) {
logInfo ( s"Buffer miss for $groupId $topic $partition $offset " )
seek ( offset )
poll ( timeout )
require ( buffer . hasNext (),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout " )
record = buffer . next ()
require ( record . offset == offset ,
s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
s"got offset ${ record . offset } instead. If this is a compacted topic, consider enabling " +
"spark.streaming.kafka.allowNonConsecutiveOffsets"
)
}
nextOffset = offset + 1
record
}
// 定位offser,此方法的consumer對象是kafka的consumer。
private def seek ( offset : Long ) : Unit = {
logDebug ( s"Seeking to $topicPartition $offset " )
consumer . seek ( topicPartition , offset )
}
// 從kafka拉取對應topic,partition的數據,並更新到buffer中。
private def poll ( timeout : Long ) : Unit = {
// 拉取kafka中的數據
val p = consumer . poll ( timeout )
// 過濾出指定topic和partition的數據
val r = p . records ( topicPartition )
logDebug ( s"Polled ${ p . partitions () } ${ r . size } " )
// 數據更新到buffer中。
buffer = r . listIterator
}
到這裡,KafkaRDDIterator
的代碼邏輯基本講完了。這裡關於緩存的邏輯會比較繞。這裡有一張圖解釋了bache與bache之間consumer的共享,及next與next之間buffer的共享關係。
CompactedKafkaRDDIterator邏輯
當消費的kafka topic的刪除邏輯是compact
時才需要用到CompactedKafkaRDDIterator
迭代器。並且要使用此迭代器需要把spark的spark.streaming.kafka.allowNonConsecutiveOffsets
參數設置為true
,此參數默認是false
,默認用KafkaRDDIterator
。
先來看下此迭代器的代碼:
private class CompactedKafkaRDDIterator [ K , V ](
part : KafkaRDDPartition ,
context : TaskContext ,
kafkaParams : ju.Map [ String , Object ],
useConsumerCache : Boolean ,
pollTimeout : Long ,
cacheInitialCapacity : Int ,
cacheMaxCapacity : Int ,
cacheLoadFactor : Float
) extends KafkaRDDIterator [ K , V ](
part ,
context ,
kafkaParams ,
useConsumerCache ,
pollTimeout ,
cacheInitialCapacity ,
cacheMaxCapacity ,
cacheLoadFactor
) {
consumer . compactedStart ( part . fromOffset , pollTimeout )
// 在初始時直接獲取下一個record
private var nextRecord = consumer . compactedNext ( pollTimeout )
private var okNext : Boolean = true
// 利用okNext決定是否還有下一個元素,初始為true
override def hasNext () : Boolean = okNext
override def next () : ConsumerRecord [ K , V ] = {
if (! hasNext ) {
throw new ju . NoSuchElementException ( "Cant call getNext() once untilOffset has been reached" )
}
// 獲取下一個record
val r = nextRecord
if ( r . offset + 1 >= part . untilOffset ) {
// 判斷record的下一個offset大於等於截止offset,
// 如果是的話就沒有下一個元素,將okNext置為false
okNext = false
} else {
// 更新下一個record,下一次執行next方法時可以省略獲取nextRecord的時間,
// 並且也可以提前知道是否還有下一個record。
nextRecord = consumer . compactedNext ( pollTimeout )
if ( nextRecord . offset >= part . untilOffset ) {
// 如果nextRecord的offset大於截止offset,則把oKNext置為false,意為hasNext為false
okNext = false
// 撤銷本次的compactedNext邏輯。
consumer . compactedPrevious ()
}
}
r
}
}
從上面可以看出,此類是繼承的KafkaRDDIterator
,所以是否用consuemr緩存的邏輯是一樣的。這裡就不會再說一次了。這裡最主要邏輯是在consumer.compactedNext(pollTimeout)
方法中,接下來我們看下此方法的實現:
/**
* Get the next record in the batch from a compacted topic.
* Assumes compactedStart has been called first, and ignores gaps.
*/
def compactedNext ( timeout : Long ) : ConsumerRecord [ K , V ] = {
if (! buffer . hasNext ()) {
// 如果buffer沒有next,則消費kafka數據,更新buffer。
poll ( timeout )
}
require ( buffer . hasNext (),
s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout " )
// 獲取buffer的下一個數據。
val record = buffer . next ()
// offset + 1
nextOffset = record . offset + 1
record
}
從上面可以看出compactedNext
的實現邏輯相對於KafkaRDDIterator
中的get
方法實現邏輯簡單很多。沒有對buffer獲取到的record的offset與nextOffset
檢驗,直接就認為取到的數據是正確的。這樣做的原因是因為topic設置為compact
刪除之後,offset就是不連續的了,因此無法判斷record
的offset是否合理。
總結
至此,KafkaUtils.createDirectStream的邏輯就講解完了。核心思想和其他的運算元是一樣的,就是先實例化DStream,然後DStream實例化RDD,之後RDD實例化Iterator,最終通過Iterator的next方法來獲取數據。
推薦閱讀: