Parquet是一種列式存儲格式,很多種處理引擎都支持這種存儲格式,也是sparksql的默認存儲格式。Spark SQL支持靈活的讀和寫Parquet文件,並且對parquet文件的schema可以自動解析。當Spark SQL需要寫成Parquet文件時,處於兼容的原因所有的列都被自動轉化為了nullable。

1讀寫Parquet文件

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

2分區發現

分區表時很多系統支持的,比如hive,對於一個分區表,往往是採用表中的某一或多個列去作為分區的依據,分區是以文件目錄的形式體現。所有內置的文件源(Text/CSV/JSON/ORC/Parquet)都支持自動的發現和推測分區信息。例如,我們想取兩個分區列,gender和country,先按照性別分區,再按照國家分區:

path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...

├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...SparkSession.read.parquet 或者 SparkSession.read.load讀取的目錄為path/to/table的時候,會自動從路徑下抽取分區信息。返回DataFrame的表結構為:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)細細分析一下你也會發現分區列的數據類型也是自動推斷的。當前支持的數據類型有,數字類型,date,timestamp和string類型。有時候用戶可能不希望自動推斷分區列的類型,這時候只需要將spark.sql.sources.partitionColumnTypeInference.enabled配置為false即可。如果分區列的類型推斷這個參數設置為了false,那麼分區列的類型會被認為是string。

從spark 1.6開始,分區發現默認情況只會發現給定路徑下的分區。比如,上面的分區表,假如你講路徑path/to/table/gender=male傳遞給SparkSession.read.parquet 或者 SparkSession.read.load 那麼gender不會被認為是分區列。如果想檢測到該分區,傳給spark的路徑應該是其父路徑也即是path/to/table/,這樣gender就會被認為是分區列。

3schema合併

跟protocol buffer,avro,thrift一樣,parquet也支持schema演變升級。用戶可以在剛開始的時候創建簡單的schema,然後根據需要隨時擴展新的列。

spark sql 用Parquet 數據源支持自動檢測新增列並且會合併schema。

由於合併schema是一個相當耗費性能的操作,而且很多情況下都是不必要的,所以從spark 1.5開始就默認關閉掉該功能。有兩種配置開啟方式:

1.通過數據源option設置mergeSchema為true。

2.在全局sql配置中設置spark.sql.parquet.mergeSchema 為true.

// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)

4hive metastore Parquet錶轉換

當讀寫hive metastore parquet格式表的時候,Spark SQL為了較好的性能會使用自己默認的parquet格式而不是採用hive SerDe。該行為是通過參數spark.sql.hive.convertMetastoreParquet空值,默認是true。

5Hive和parquet兼容性

從表schema處理角度講hive和parquet有兩個主要的區別

  1. hive是大小寫敏感的,但是parquet不是。
  2. hive會講所有列視為nullable,但是nullability在parquet里有獨特的意義。

由於上面的原因,在將hive metastore parquet轉化為spark parquet表的時候,需要處理兼容一下hive的schema和parquet的schema。兼容處理的原則是:

  1. 有相同名字的欄位必須要有相同的數據類型,忽略nullability。兼容處理的欄位應該保持parquet側的數據類型,這樣就可以處理到nullability類型了。
  2. 兼容處理的schema應直接包含在hive元數據里的schema信息:
    1. 任何僅僅出現在parquet schema的欄位將會被刪除
    2. 任何僅僅出現在hive 元數據里的欄位將會被視為nullable。

6元數據刷新

Spark SQL為了更好的性能會緩存parquet的元數據。當spark 讀取hive表的時候,schema一旦從hive轉化為spark sql的,就會被spark sql緩存,如果此時表的schema被hive或者其他外部工具更新,必須要手動的去刷新元數據,才能保證元數據的一致性。

spark.catalog.refreshTable("my_table")

7配置

parquet的相關的參數可以通過setconf或者set key=value的形式配置。

  • spark.sql.parquet.binaryAsString 默認值是false。一些parquet生產系統,尤其是impala,hive和老版本的spark sql,不區分binary和string類型。該參數告訴spark 講binary數據當作字元串處理。
  • spark.sql.parquet.int96AsTimestamp 默認是true。有些parquet生產系統,尤其是parquet和hive,將timestamp翻譯成INT96.該參數會提示Spark SQL講INT96翻譯成timestamp。
  • spark.sql.parquet.compression.codec 默認是snappy。當寫parquet文件的時候設置壓縮格式。如果在option或者properties里配置了compression或者parquet.compression優先順序依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支持的配置類型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安裝ZstandardCodec,brotli需要安裝BrotliCodec。
  • spark.sql.parquet.filterPushdown 默認是true。設置為true代表開啟parquet下推執行優化。
  • spark.sql.hive.convertMetastoreParquet 默認是true。假如設置為false,spark sql會讀取hive parquet表的時候使用Hive SerDe,替代內置的。
  • spark.sql.parquet.mergeSchema 默認是false。當設置為true的時候,parquet數據源會合併讀取所有的parquet文件的schema,否則會從summary文件或者假如沒有summary文件的話隨機的選一些數據文件來合併schema。
  • spark.sql.parquet.writeLegacyFormat 默認是false。如果設置為true 數據會以spark 1.4和更早的版本的格式寫入。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出,該格式是其他系統例如hive,impala等使用的。如果是false,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。如果spark sql要以parquet輸出並且結果會被不支持新格式的其他系統使用的話,需要設置為true。

推薦閱讀

spark面試該準備點啥

Spark SQL從入門到精通

Spark Streaming 場景應用


推薦閱讀:
相关文章