• 所謂聚合無外乎在groupbykey的操作後面追加agg。
  • 這裡有幾個注意點:
  • 1.聚合裡面函數如count,sum需要從sql.function下導入
  • 2.聚合之後的列名,scala版本可以用alias或者as,而pyspark只能用as
  • 所謂開窗函數就是分組取topn.注意:
  • 1.需要導入一個window,還需要一個sql.row_numbere
  • 2.over裡面的orderby 降序,scala是desc,Pythondesc()
  • 3。Python版本可以直接在select裡面追加新列,scala只能用withColumn

python-pyspark

from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
from pyspark.sql.functions import row_number

spark=SparkSession.builder
.appName("agg&&window_test")
.master("local[*]")
.getOrCreate()

UserAcessLog = ["2019-05-29,aa,3","2019-05-29,bb,12",
"2019-05-29,aa,65","2019-05-29,cc,",
"2019-05-30,aa,56","2019-05-30,ee,45",
"2019-05-30,aa,45","2019-05-30,aa,30"
]
LogStrRDD = spark.sparkContext.parallelize(UserAcessLog,1)
.filter(lambda line: True if (len(line.split(,)[2])>0) else False)

LogStrRDD = LogStrRDD.map(lambda line :
Row(line.split(",")[0],
line.split(",")[1],
int(line.split(",")[2])))
#
schema = StructType([StructField("date",StringType()),
StructField("userid",StringType()),
StructField("click",IntegerType())])

LogDF = spark.createDataFrame(LogStrRDD,schema)
LogDF.show()

# Python版本agg裏 只能用alias別名,as報錯
LogDF.groupBy("date").agg(
countDistinct("userid").alias("ss"),
sum("click").alias("click_sum")).show()

LogDF.select("date","userid","click",
dense_rank().over(
Window.partitionBy("date").orderBy("click")).alias("rank")
).show()

運行結果見github

chen-eleven/spark_scala_python?

github.com
圖標

scala-spark-shell

package chen.spark.test
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
// 聚合函數都來自於func
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.functions.dense_rank
import org.apache.spark.sql.expressions.Window
// pyspark:pyspark.sql.window.Window

object spark_6_DailyPv_scala {
def main(args:Array[String]){
val spark: SparkSession = SparkSession.builder
.appName("agg&&window_test")
.master("local[*]")
.getOrCreate
println(1)
println(spark.sparkContext.master)

val UserAcessLog = Array("2019-05-29,aa,45","2019-05-29,bb,45",
"2019-05-29,aa,45","2019-05-29,cc,45",
"2019-05-30,aa,35","2019-05-30,ee,35",
"2019-05-30,aa,35","2019-05-30,aa,35"
)
val LogStrRDD = spark.sparkContext.parallelize(UserAcessLog,1)
.filter(line => if(line.split(",")(2).length >0) true else false)
.map(line=>
Row(line.split(",")(0),
line.split(",")(1),
line.split(",")(2).toInt))

val schema = StructType(Array(StructField("date",StringType),
StructField("userid",StringType),
StructField("click",IntegerType))
)

val LogDF = spark.sqlContext.createDataFrame(LogStrRDD,schema)

LogDF.groupBy("date").agg(
countDistinct("userid").as("uv"),
sum("click").alias("click_sum")).show()

val rankSpec = Window.partitionBy("date").orderBy(LogDF("click").desc)
LogDF.withColumn("rank", dense_rank().over(rankSpec)).show()
}
}

chen-eleven/spark_scala_python?

github.com圖標


推薦閱讀:
相關文章