SparkSQLのリファレンスを作成しました。
第一部は構文編として構文の解説を、第二部は演算子編として演算子の解説を、第三部は関数編として関数の解説を掲載します。
できるかぎり、SQL文を使用する場合とDataFrameオペレータを使用する場合の両方のケースを記載していきたいと思います。
執筆時点のSpark最新版は1.5.0だったので1.5.0で動作を確認しています。
構文
演算子
関数
基本的なデータ取得構文は下記のようになります。
SQL >>>
SELECT expr1 [[AS] alias1] [, expr2 [[AS] alias2], ...] [FROM [table_name1|(subselect1)] [, table_name2|(subselect2), ...)] [[INNER|[FULL|RIGHT|LEFT] OUTER|LEFT SEMI] JOIN table_2|(subselect2) [[AS] tablealias2] ON join_condition_1 [... AND join_condition_N ...]]+ [WHERE condition] [GROUP BY alias1 [, alias2, ...]] [WITH ROLLUP|CUBE] [HAVING condition] [ORDER BY field1|alias1 [DESC|ASC] [, field2|alias2 [DESC|ASC], ...]] [LIMIT n] ;
DataFrame >>>
DataFrame .select( expr1 [[as] alias1] [, expr2 [[as] alias2], ...] ) [.join( join_condition_1 [... && join_condition_N ...]]+ )] [.where( where_condition )] [.(groupBy|rollup|cube)( alias1 [, alias2, ...] ) .agg( aggregation_1 ( alias1 ) [as alias2] )] [.orderBy( alias1.[(asc|desc)] [, alias2.[(asc|desc)], ...] )] [.limit( num )]
わかりにくいですね:(;゙゚’ω゚’):
次セクションから詳しく解説していきます。。。
・SELECT .. FROM …
SELECTで取得するカラムを選択し、FROMで取得先のデータソースを指定します。
SQL >>>
SELECT col1 AS alias1 ,col2 ,col3 FROM dataframe
DataFrame >>>
DataFrameではFROMはDaraFrameそのものになります。
import sqlContext.implicits._ DataFrame .select( 'col1 as 'alias1 ,$"col2" ,col( "col3" ) ,column( "col4" ) )
DataFrameでのカラム名の指定はいくつか書き方があります。
‘col1 | シンボルで表現 (シングルクォーテーション+文字列はscalaのシンボルオブジェクトになります) |
$”col2″ | $メソッドで表現 |
col(“col3”) | colメソッドで表現 |
column(“col4”) | columnメソッドで表現 |
シンボル表現と$メソッドはimport sqlContext.implicits._を行い、implicit classをインポートすると使えるようになります。
・WHERE
WHEREはデータのフィルタリング条件を指定します。
※ここらへんはSQL99等の仕様と同じです。
SQL >>>
SELECT * FROM dataframe WHERE col2 > 0
DataFrame >>>
DataFrameではfilterを使うこともできます。
DataFrame .where( $"col2" > 0 ) DataFrame .filter( $"col2".isNull )
・JOIN
JOINは複数のデータ(テーブル)を条件に従って結合します。
使えるJOINタイプは下記のとおりです。
単にJOINとだけ指定するとINNER JOINとなります。
LEFT SEMI JOINはWHERE句内でサブクエリを持つINメソッドを使いたいときに使用します(Hiveと同じ)。
- (INNER) JOIN
- {LEFT|RIGHT|FULL} OUTER JOIN
- LEFT SEMI JOIN
- CROSS JOIN
SQL >>>
SELECT a.col1 AS acol ,b.col1 AS bcol FROM dataframe1 AS a LEFT JOIN dataframe2 AS b ON b.col1 = a.col1
DataFrame >>>
DataFrameでももちろんJOIN、エイリアスが使えます。
DataFrame1 as $"a" join( DataFrame2 as $"b", $"a.col1" === $"b.col2", "left" )
・ORDER BY
ORDER BYはデータ集合の並び替えを行います。
並び替え順序は下記のキーワードで指定します。
- 昇順: asc
- 降順: desc
並び替え順序が省略された場合は昇順(asc)が適用されます。
SQL >>>
SELECT col1 FROM dataframe ORDER BY col1 DESC
DataFrame >>>
DataFrame .orderBy( 'col1.desc )
・LIMIT
LIMITはデータの上位n件を取得するのに使います。
SQL >>>
SELECT col1 FROM dataframe ORDER BY col1 DESC LIMIT 10
DataFrame >>>
DataFrame limit 10
DataFrameにはよく似たメソッドのhead(n: Int)があります。
head(n: Int)はArray型を返すのに対し、limit(n: Int)は新しいDataFrameを返すという違いがあります。
・GROUP BY
GROUP BYはデータ集合を条件に従ってグルーピングします。
よってしばしば集計関数とセットで使われます。
SQL >>>
SELECT COUNT(*) AS n FROM dataframe GROUP BY col1
DataFrame >>>
DataFrame .groupBy( 'col1 ) .agg( count( "*" ) )
集計関数については第二部で詳しく解説します。
・HAVING
HAVINGを使うと集計結果をフィルタリングの条件に使うことができます。
※WHERE句には集計関数を含めることはできません。
SQL >>>
SELECT SUM(col1) AS total ,col2 FROM dataframe GROUP BY col2 HAVING SUM(col1) > 100
DataFrame >>>
DataFrameにはhavingがありません。集計結果に対してフィルタリングを行いたい場合もwhereもしくはfilterを使います。
DataFrame .groupBy( 'col2 ) .agg( sum( 'col1 ) as 'total ) .where( 'total > 100 )
すべてのメソッドチェーンはDataFrameを返すのでフィルタリングして集計してフィルタリング、みたいなこともできちゃいます。
DataFrame .where( 'col2.isin( "apple", "banana" ) ) .groupBy( 'col2 ) .agg( sum( 'col1 ) as 'total ) .where( 'total > 100 )
・ROLLUP
なんとSparkではROLLUP集計もできてしまいます。
(HiveContextの利用が必要ですが・・・)
ROLLUPを使うとグルーピング結果の総計を出すことができます。
わかりにくいので例を出しますね。
色 | total | 備考* ----+-------+---------- | 8 | 総合計 赤 | 5 | 赤の合計 白 | 6 | 白の合計
※備考は出力されません。
SQL >>>
SELECT COUNT(*) AS total ,col1 FROM dataframe GROUP BY col1 WITH ROLLUP
DataFrame >>>
DataFrameでもrollupが使えます。
DataFrame .rollup( "col1" ) .agg( count( "*" ) as 'total )
・CUBE
CUBEもHiveContextの利用が必要になります。
CUBEを使うとクロス集計とその小計を出すことができます。
CUBEはROLLUP以上にわかりにくいので、こちらも例を出しますね。
色 | 種類 | total | 備考* ----+----------------+-------+---------- | | 8 | 総合計 赤 | | 5 | 赤の合計 赤 | コザクラインコ | 4 | 赤 | コンゴウインコ | 1 | 白 | | 3 | 白の合計 白 | アヒル | 2 | 白 | 文鳥 | 1 |
※備考は出力されません。
SQL >>>
SELECT COUNT(*) AS total ,col1 ,col2 FROM dataframe GROUP BY col1 ,col2 WITH CUBE
DataFrame >>>
DataFrameでもcubeが使えます。
DataFrame .cube( "col1", "col2" ) .agg( count( "*" ) as 'total )
Apache Spark: SparkSQLリファレンス〜演算子編〜へ
宜しければこちらの動画もどうぞー
Sparkのインストールからspark-shellでのword-countまで行っています。
https://www.youtube.com/watch?v=Dci8Vig3PKw