在上一篇,我們從使用方式和策略上,對消息隊列做了一個宏觀描述。從本篇開始,我們將深入到源碼內部,仔細分析Kafka到底是如何實現一個分散式消息隊列。我們的分析將從Producer端開始。
從Kafka 0.8.2開始,發布了一套新的Java版的client api, KafkaProducer/KafkaConsumer,替代之前的scala版的api。本系列的分析將只針對這套Java版的api。
下圖是經過源碼分析之後,整理出來的Producer端的架構圖:
在上一篇我們講過,Producer有同步發送和非同步發送2種策略。在以前的Kafka client api實現中,同步和非同步是分開實現的。而在0.9中,同步發送其實是通過非同步發送間接實現,其介面如下:
public class KafkaProducer<K, V> implements Producer<K, V> { ... public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) //非同步發送介面 { ... } }
要實現同步發送,只要在拿到返回的Future對象之後,直接調用get()就可以了。
從上圖我們可以看出,非同步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然後一個後臺線程Sender不斷循環,把消息發給Kafka集羣。
要實現這個,還得有一個前提條件:就是KafkaProducer/Sender都需要獲取集羣的配置信息Metadata。所謂Metadata,也就是在上一篇所講的,Topic/Partion與broker的映射關係:每一個Topic的每一個Partion,得知道其對應的broker列表是什麼,其中leader是誰,follower是誰。
所以在上圖中,有2個數據流:
消息流(B1, B2, B3):這個很好理解,不再詳述。
本篇著重講述Metadata流,消息流,將在後續詳細講述。
從上圖可以看出,Metadata是多個producer線程讀,一個sender線程更新,因此它必須是線程安全的。
Kafka的官方文檔上也有說明,KafkaProducer是線程安全的,可以在多線程中調用:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
從下面代碼也可以看出,它的所有public方法都是synchronized:
public final class Metadata { 。。。 public synchronized Cluster fetch() { return this.cluster; } public synchronized long timeToNextUpdate(long nowMs) { 。。。 } public synchronized int requestUpdate() { 。。。 } 。。。 }
下面代碼列舉了Metadata的主要數據結構:一個Cluster對象 + 1堆狀態變數。前者記錄了集羣的配置信息,後者用於控制Metadata的更新策略。
public final class Metadata { ... private final long refreshBackoffMs; //更新失敗的情況下,下1次更新的補償時間(這個變數在代碼中意義不是太大) private final long metadataExpireMs; //關鍵值:每隔多久,更新一次。預設是600*1000,也就是10分種 private int version; //每更新成功1次,version遞增1。這個變數主要用於在while循環,wait的時候,作為循環判斷條件 private long lastRefreshMs; //上一次更新時間(也包含更新失敗的情況) private long lastSuccessfulRefreshMs; //上一次成功更新的時間(如果每次都成功的話,則2者相等。否則,lastSuccessulRefreshMs < lastRefreshMs) private Cluster cluster; //集羣配置信息 private boolean needUpdate; //是否強制刷新 、 ... }
public final class Cluster { ... private final List<Node> nodes; //Node也就是Broker private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; //Topic/Partion和broker list的映射關係 private final Map<String, List<PartitionInfo>> partitionsByTopic; private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; private final Map<Integer, List<PartitionInfo>> partitionsByNode; private final Map<Integer, Node> nodesById; }
public class PartitionInfo { private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; }
下面是send函數的源碼,可以看到,在send之前,會先讀取metadata。如果metadata讀不到,會一直阻塞在那,直到超時,拋出TimeoutException
//KafkaProducer public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try { long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //拿不到topic的配置信息,會一直阻塞在這,直到拋異常
... //拿到了,執行下面的send邏輯 } catch() {} }
//KafkaProducer private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { if (!this.metadata.containsTopic(topic)) this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null) return 0; //取到topic的配置信息,直接返回
long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; while (metadata.fetch().partitionsForTopic(topic) == null) { //取不到topic的配置信息,一直死循環wait,直到超時,拋TimeoutException log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); //把needUpdate置為true sender.wakeup(); //喚起sender
metadata.awaitUpdate(version, remainingWaitMs); //metadata的關鍵函數 long elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (metadata.fetch().unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; } return time.milliseconds() - begin; }
//Metadata public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { //當Sender成功更新meatadata之後,version加1。否則會循環,一直wait if (remainingWaitMs != 0 wait(remainingWaitMs); //線程的wait機制,wait和synchronized的配合使用 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) //wait時間超出了最長等待時間 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }
總結:從上面代碼可以看出,producer wait metadata的時候,有2個條件:
(2)while (this.version <= lastVersion)
有wait就會有notify,notify在Sender更新Metadata的時候發出。
下面是KafkaProducer的構造函數,從代碼可以看出,Sender就是KafkaProducer中創建的一個Thread.
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { try { ... this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); //構造metadata
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); //往metadata中,填入初始的,配置的node列表
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.sender = new Sender(client, //構造一個sender。sender本身實現的是Runnable介面 this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); //一個線程,開啟sender
public void run() { // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } 。。。 }
public void run(long now) { Cluster cluster = metadata.fetch(); 。。。 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); //遍歷消息隊列中所有的消息,找出對應的,已經ready的Node
if (result.unknownLeadersExist) //如果一個ready的node都沒有,請求更新metadata this.metadata.requestUpdate();
。。。
//client的2個關鍵函數,一個發送ClientRequest,一個接收ClientResponse。底層調用的是NIO的poll。關於nio, 後面會詳細介紹 for (ClientRequest request : requests) client.send(request, now);
this.client.poll(pollTimeout, now); }
//NetworkClient public List<ClientResponse> poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); //關鍵點:每次poll的時候判斷是否要更新metadata
try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); }
// process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); //在返回的handler中,會處理metadata的更新 handleDisconnections(responses, updatedNow); handleConnections(); handleTimedOutRequests(responses, updatedNow);
// invoke callbacks for (ClientResponse response : responses) { if (response.request().hasCallback()) { try { response.request().callback().onComplete(response); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } }
return responses; }
//DefaultMetadataUpdater @Override public long maybeUpdate(long now) { // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0; // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
if (metadataTimeout == 0) { // highly dependent on the behavior of leastLoadedNode. Node node = leastLoadedNode(now); //找到負載最小的Node maybeUpdate(now, node); //把更新Metadata的請求,發給這個Node }
return metadataTimeout; }
private void maybeUpdate(long now, Node node) { if (node == null) { log.debug("Give up sending metadata request since no node is available"); // mark the timestamp for no node available to connect this.lastNoNodeAvailableMs = now; return; } String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) { Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics(); this.metadataFetchInProgress = true; ClientRequest metadataRequest = request(now, nodeConnectionId, topics); //關鍵點:發送更新Metadata的Request log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); doSend(metadataRequest, now); //這裡只是非同步發送,返回的response在上面的handleCompletedReceives裡面處理 } else if (connectionStates.canConnect(nodeConnectionId, now)) { log.debug("Initialize connection to node {} for sending metadata request", node.id()); initiateConnect(node, now);
} else { // connected, but cant send more OR connecting this.lastNoNodeAvailableMs = now; } }
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); // Always expect the response version id to be the same as the request version id short apiKey = req.request().header().apiKey(); short apiVer = req.request().header().apiVersion(); Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload()); correlate(req.request().header(), header); if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) responses.add(new ClientResponse(req, now, false, body)); } }
@Override public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) { short apiKey = req.request().header().apiKey(); if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) { handleResponse(req.request().header(), body, now); return true; } return false; }
//關鍵函數 private void handleResponse(RequestHeader header, Struct body, long now) { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); //從response中,拿到一個新的cluster對象 if (response.errors().size() > 0) { log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors()); }
if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now); //更新metadata,用新的cluster覆蓋舊的cluster } else { log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); this.metadata.failedUpdate(now); //更新metadata失敗,做失敗處理邏輯 } }
//更新成功,version+1, 同時更新其它欄位 public synchronized void update(Cluster cluster, long now) { this.needUpdate = false; this.lastRefreshMs = now; this.lastSuccessfulRefreshMs = now; this.version += 1;
for (Listener listener: listeners) listener.onMetadataUpdate(cluster); //如果有人監聽了metadata的更新,通知他們
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster; //新的cluster覆蓋舊的cluster
notifyAll(); //通知所有的阻塞的producer線程
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); }
//更新失敗,只更新lastRefreshMs public synchronized void failedUpdate(long now) { this.lastRefreshMs = now; }
從上面可以看出,Metadata的更新,是在while循環,每次調用client.poll()的時候更新的。
(1)週期性的更新: 每隔一段時間更新一次,這個通過 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 這2個欄位來實現
對應的ProducerConfig配置項為:
(2) 失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新。 requestUpdate()函數裡面其實什麼都沒做,就是把needUpdate置成了false
每次poll的時候,都檢查這2種更新機制,達到了,就觸發更新。
那如何判定Metadata失效了呢?這個在代碼中很分散,有很多地方,會判定Metadata失效。
private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { connectionStates.disconnected(nodeConnectionId, now); metadataUpdater.requestUpdate(); //判定metadata失效 log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } }
private void handleDisconnections(List<ClientResponse> responses, long now) { for (String node : this.selector.disconnected()) { log.debug("Node {} disconnected.", node); processDisconnection(responses, node, now); } if (this.selector.disconnected().size() > 0) metadataUpdater.requestUpdate(); //判定metadata失效 }
private void handleTimedOutRequests(List<ClientResponse> responses, long now) { List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs); for (String nodeId : nodeIds) { this.selector.close(nodeId); log.debug("Disconnecting from node {} due to request timeout.", nodeId); processDisconnection(responses, nodeId, now); }
if (nodeIds.size() > 0) metadataUpdater.requestUpdate(); //判定metadata失效 }
public void run(long now) { Cluster cluster = metadata.fetch(); RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist) this.metadata.requestUpdate();
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) { int correlationId = response.request().request().header().correlationId(); if (response.wasDisconnected()) { log.trace("Cancelled request {} due to node {} being disconnected", response, response.request() .request() .destination()); for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);
總之1句話:發生各式各樣的異常,數據不同步,都認為metadata可能出問題了,要求更新。
除了上面所述,Metadata的更新,還有以下幾個特點:
1.更新請求MetadataRequest是nio非同步發送的,在poll的返回中,處理MetadataResponse的時候,才真正更新Metadata。
這裡有個關鍵點:Metadata的cluster對象,每次是整個覆蓋的,而不是局部更新。所以cluster內部不用加鎖。
2.更新的時候,是從metadata保存的所有Node,或者說Broker中,選負載最小的那個,也就是當前接收請求最少的那個。向其發送MetadataRequest請求,獲取新的Cluster對象。