SparkSQLリファレンス第四部、関数編・Window関数です。
Window関数
SparkSQLではデータの切り出し集計を行うWindow関数を使うことができます。
Window計算自体はWindow関数以外の集計関数で行うこともできます。
| 関数 | 内容 | ver. |
|---|---|---|
| avg |
avg(e: Column) avg(columnName: String) グループ内での平均値を計算します。 sql: select avg( e ) from table group by a DataFrame: df.groupBy( $"a" ).agg( avg( $"e" ) ) 例) グループ内アイテムが ( 1, 3, 3, 9 )の場合、4が返ります。 |
1.3.0 |
| cumeDist |
cumeDist() 与えられたパーティション内の累積分布を計算します。 sql: select
cume_dist() over(
partition by key order by e
)
from
table
DataFrame: df.select(
cumeDist().over(
Window.partitionBy( $"key" ).orderBy( $"e" )
)
)
| 1.4.0 |
| denseRank |
denseRank() rank関数やrow_number関数と同じくパーティション内の順位付けを行う関数です。 このページの下にdenseRank, rank, row_numberの違いをまとめました。 sql: select
dense_rank() over(
partition by key order by e
)
from
table
DataFrame: df.select(
denseRank().over(
Window.partitionBy( $"key" ).orderBy( $"e" )
)
)
| 1.4.0 |
| lag |
lag(e: Column, offset: Int) lag(columnName: String, offset: Int) lag(e: Column, offset: Int, defaultValue: Any) lag(columnName: String, offset: Int, defaultValue: Any) ウィンドウ内で、現在行からoffsetだけ前の行の値を返却します。 前にoffset分の行がない場合はnullを返します。 defaultValueが指定されている場合は、前にoffset分の行がない場合にdefaultValueを返します。 先月の売上高と今月の売上高を比較したいような場合に使います。 sql: select
lag( e, 1 ) over(
partition by category order by date
)
from
table
DataFrame: df.select(
lag( $"e", 1 ).over(
Window.partitionBy( $"category" ).orderBy( $"date" )
)
)
| 1.4.0 |
| lead |
lead(columnName: String, offset: Int) lead(e: Column, offset: Int) lead(columnName: String, offset: Int, defaultValue: Any) lead(e: Column, offset: Int, defaultValue: Any) lag関数の逆です。 ウィンドウ内で、現在行からoffsetだけ後ろの行の値を返却します。 後ろにoffset分の行がない場合はnullを返します。 defaultValueが指定されている場合は、後ろにoffset分の行がない場合にdefaultValueを返します。 来月の売上高予測と今月の売上高予測を比較したいような場合に使います。 sql: select
lead( e, 1, 0 ) over(
partition by category order by date
)
from
table
DataFrame: df.select(
lead( $"e", 1, 0 ).over(
Window.partitionBy( $"category" ).orderBy( $"date" )
)
)
| 1.4.0 |
| ntile |
ntile(n: Int) ウィンドウ内でデータを指定されたグループ数に分割します。 sql: select
ntile( 4 ) over(
partition by key order by e
)
from
table
DataFrame: df.select(
ntile( 4 ).over(
Window.partitionBy( $"key" ).orderBy( $"e" )
)
)
例) 上記の場合、1,2,3,4のいずれかのグループ番号が振られます。 | 1.4.0 |
| percentRank |
percentRank() ウィンドウのパーセンタイルを計算します。 (ランク – 1) / (行数 – 1) select
percent_rank() over(
partition by key order by e
)
from
table
DataFrame: df.select(
percentRank().over(
Window.partitionBy( $"key" ).orderBy( $"e" )
)
)
| 1.4.0 |
| rank |
rank() ウィンドウ内における行のランクを返却します。 このページの下にdenseRank, rank, row_numberの違いをまとめました。 sql: select
rank() over(
partition by key order by e
)
from
table
DataFrame: df.select(
rank().over(
Window.partitionBy( $"key" ).orderBy( $"e" )
)
)
| 1.4.0 |
| rowNumber |
rowNumber() 1から始まる連続数を返します。 partitionを指定しなければ、全行に対して1から始まるインデックスが振られます。 sql: select
row_number() over(
partition by key order by e
)
from
table
DataFrame: df.select(
rowNumber().over(
Window.partitionBy( $"key" ).orderBy( $"e" )
)
)
| 1.4.0 |
denseRank, rank, rowNumberの違い
| 関数 | 意味 |
|---|---|
| denseRank | 同率ランクがある場合に順位を飛ばします。 |
| rank | 同率ランクがある場合に順位飛ばしを行いません。 |
| rowNumber | 順位の重複がありません。 |
例
年齢(age)に対してそれぞれのランクを出すと下記のような結果になります。
select
row_number() over(
partition by member order by age
)
from
table
| member | age | denseRank | rank | rowNumber |
|---|---|---|---|---|
| john | 15 | 1 | 1 | 1 |
| michel | 20 | 2 | 2 | 2 |
| juliet | 20 | 2 | 2 | 3 |
| anna | 25 | 4 | 3 | 4 |
| emily | 25 | 4 | 3 | 5 |
| mason | 30 | 6 | 4 | 6 |
構文
演算子
関数
宜しければこちらの動画もどうぞー
Apache Spark part5 Zeppelinを使ってデータ分析の初歩
https://www.youtube.com/watch?v=TUQESh3NUlc
