寫這篇文章的原因是前兩天星球球友去面試,面試管問了一下,Spark 分析ES的數據,生成的RDD分區數跟什麼有關係呢?

稍微猜測一下就能想到跟分片數有關,但是具體是什麼關係呢?

可想的具體關係可能是以下兩種:

1).就像KafkaRDD的分區與kafka topic分區數的關係一樣,一對一。

2).ES支持游標查詢,那麼是不是也可以對比較大ES 索引的分片進行拆分成多個RDD分區呢?

那麼下面浪尖帶著大家翻一下源碼看看具體情況。

1.Spark Core讀取ES

ES官網直接提供的有elasticsearch-hadoop 插件,對於ES 7.x,hadoop和Spark版本支持如下:

hadoop2Version = 2.7.1
hadoop22Version = 2.2.0
spark13Version = 1.6.2
spark20Version = 2.3.0

浪尖這了採用的ES版本是7.1.1,測試用的Spark版本是2.3.1,沒有問題。整合es和spark,導入相關依賴有兩種方式:

a,導入整個elasticsearch-hadoop包

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>7.1.1</version>
</dependency>

b,只導入spark模塊的包

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.1.1</version>
</dependency>

浪尖這裡為了測試方便,只是在本機起了一個單節點的ES實例,簡單的測試代碼如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

object es2sparkrdd {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)

conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
conf.set(ConfigurationOptions.ES_PORT, "9200")
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

val sc = new SparkContext(conf)
import org.elasticsearch.spark._

sc.esRDD("posts").foreach(each=>{
each._2.keys.foreach(println)
})
sc.esJsonRDD("posts").foreach(each=>{
println(each._2)
})

sc.stop()
}
}

可以看到Spark Core讀取RDD主要有兩種形式的API:

a,esRDD。這種返回的是一個tuple2的類型的RDD,第一個元素是id,第二個是一個map,包含ES的document元素。

RDD[(String, Map[String, AnyRef])]

b,esJsonRDD。這種返回的也是一個tuple2類型的RDD,第一個元素依然是id,第二個是json字元串。

RDD[(String, String)]

雖然是兩種類型的RDD,但是RDD都是ScalaEsRDD類型。

要分析Spark Core讀取ES的並行度,只需要分析ScalaEsRDD的getPartitions函數即可。

2.源碼分析

首先導入源碼github.com/elastic/elas這個是gradle工程,可以直接導入idea,然後切換到7.x版本即可。

廢話少說直接找到ScalaEsRDD,發現gePartitions是在其父類實現的,方法內容如下:

override def getPartitions: Array[Partition] = {
esPartitions.zipWithIndex.map { case(esPartition, idx) =>
new EsPartition(id, idx, esPartition)
}.toArray
}

esPartitions是一個lazy型的變數:

@transient private[spark] lazy val esPartitions = {
RestService.findPartitions(esCfg, logger)
}

這種聲明原因是什麼呢?

lazy+transient的原因大家可以考慮一下。

RestService.findPartitions方法也是僅是創建客戶端獲取分片等信息,然後調用,分兩種情況調用兩個方法。

final List<PartitionDefinition> partitions;
// 5.x及以後版本 同時沒有配置es.input.max.docs.per.partition
if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {
partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
} else {
partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
}

a).findSlicePartitions

這個方法其實就是在5.x及以後的ES版本,同時配置了

es.input.max.docs.per.partition

以後,才會執行,實際上就是將ES的分片按照指定大小進行拆分,必然要先進行分片大小統計,然後計算出拆分的分區數,最後生成分區信息。具體代碼如下:

long numDocs;
if (readResource.isTyped()) {
numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);
} else {
numDocs = client.countIndexShard(index, Integer.toString(shardId), query);
}
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++) {
PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
}

實際上分片就是用游標的方式,對_doc進行排序,然後按照分片計算得到的分區偏移進行數據的讀取,組裝過程是SearchRequestBuilder.assemble方法來實現的。

這個其實個人覺得會浪費一定的性能,假如真的要ES結合Spark的話,建議合理設置分片數。

b).findShardPartitions方法

這個方法沒啥疑問了就是一個RDD分區對應於ES index的一個分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
locationList.toArray(new String[0]));
partitions.add(partition);

3.總結

以上就是Spark Core讀取ES數據的時候分片和RDD分區的對應關係分析,默認情況下是一個es 索引分片對應Spark RDD的一個分區。假如分片數過大,且ES版本在5.x及以上,可以配置參數

es.input.max.docs.per.partition

進行拆分。


推薦閱讀:
相关文章