Google Cloud StorageをHadoopのファイルシステムとして使うことのできるようになるGoogle Cloud Storage Connector for HadoopというライブラリがGoogleから出ています。
これを使うと、SparkはHDFSではなくGoogle Cloudに対してジョブを実行できるようになります。
Google Cloud Storage Connector for HadoopによってHDFSが使えなくなるわけではなく、HDFSと併用してGoogle Cloudを使うことができるようになるだけなので、キャッシュやテンポラリ・ファイルはHDFSの方に載せた方がよいでしょう。
早速使ってみます。
1. 秘密鍵の発行
まずはGoogle Cloud Service接続用のクライアントIDと秘密鍵の発行を行います。
これはGoogle Developers Consoleで行います。
Google Developers Consoleにログインし、左サイドバーから「APIと認証」→「認証情報」を開きます。
「新しいクライアントIDを作成」をタップします。
「サービスアカウント」を選択してボタンをタップします。
すると下記のような画面が開きPKCS12形式で秘密鍵のダウンロードが始まります。
一応キーもメモっておきます。一応ね。
ここでダウンロードした秘密鍵ファイルとサービスアカウントに払い出されたメールアドレスをあとで使います。
ここまででDeveloper Console作業はおしまいです。
2. Google Cloud Storage Connector for Hadoopの設定
Googleの公式ページにはセットアップ・スクリプトを使うやり方も載っているのですが、このスクリプトを使うとHadoopクラスタのセットアップから行ってしまいます。
私の場合は既にあるHadoopクラスタとSparkクラスタから接続したかったので手動でのセットアップを行いました。
Sparkサーバ全台にHadoop 2.x compatible connectorをダウンロードして設置します。
wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar --no-check-certificate mv gcs-connector-latest-hadoop2.jar /usr/lib/hadoop/lib
core-site.xmlにGoogle Cloud Strageの情報を追加します。
<property> <name>fs.gs.project.id</name> <value>xxxxxxxxxxxxxxxx(数字のやつ)</value> <description> Google Cloud Project ID with access to configured GCS buckets. </description> </property> <property> <name>fs.gs.system.bucket</name> <value>バケット名(gs://は不要)</value> <description> GCS bucket to use as a default bucket if fs.default.name is not a gs: uri. </description> </property> <property> <name>fs.gs.working.dir</name> <value>/</value> <description> The directory relative gs: uris resolve in inside of the default bucket. </description> </property> <property> <name>fs.gs.impl</name> <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value> <description>The FileSystem for gs: (GCS) uris.</description> </property> <property> <name>fs.AbstractFileSystem.gs.impl</name> <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value> <description>The AbstractFileSystem for gs: (GCS) uris.</description> </property> <property> <name>fs.gs.metadata.cache.enable</name> <value>true</value> <description> If true, a DirectoryListCache will be used to supplement "list" requests to GCS to fill in any missing items caused by eventual list consistency, intercepting create/delete/copy calls to create cache entries. The concrete type is determined with fs.gs.metadata.cache.type. </description> </property> <property> <name>fs.gs.metadata.cache.type</name> <value>IN_MEMORY</value> <description> Specifies which implementation of DirectoryListCache to use for supplementing GCS API "list" requests. Supported implementations: IN_MEMORY: Enforces immediate consistency within same Java process. FILESYSTEM_BACKED: Enforces consistency across all cooperating processes pointed at the same local mirror directory, which may be an NFS directory for massively-distributed coordination. </description> </property> <property> <name>google.cloud.auth.service.account.enabl</name> <value>true</value> <description> Whether to use a service account for GCS authorizaiton. If an email and keyfile are provided (see google.cloud.auth.service.account.email and google.cloud.auth.service.account.keyfile), then that service account willl be used. Otherwise the connector will look to see if it running on a GCE VM with some level of GCS access in it's service account scope, and use that service account. </description> </property> <property> <name>google.cloud.auth.service.account.email</name> <value>1.のクライアントID作成で払いだされたメールアドレス</value> <description> The email address is associated with the service account used for GCS access when google.cloud.auth.service.account.enable is true. </description> </property> <property> <name>google.cloud.auth.service.account.keyfile</name> <value>/etc/hadoop/conf/key.p12(1.のクライアントID作成で発行された秘密鍵)</value> <description> The PKCS12 (p12) certificate file of the service account used for GCS access when google.cloud.auth.service.account.enable is true. </description> </property>
1. で発行された秘密鍵をSparkクラスタ全台に設置します。
↑の例では/etc/hadoop/confにkey.p12として置きました。
cp ~/XXXXXXXX.p12 /etc/hadoop/conf/key.p12 chmod 644 /etc/hadoop/conf/key.p12
ここまでできたらhdfsコマンドでGoogle Cloudのファイル一覧を表示してみます。
hdfs dfs -ls gs://test 15/01/09 15:50:00 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.3.1-hadoop2 Found 2 items -rwx------ 3 root root 833752492 2015-01-09 10:00 gs://test/access.log drwx------ - root root 0 2015-01-09 10:05 gs://test/log
設定に問題がなければバケットの中身が表示されるはずです。
3. Sparkの設定
最後にSparkの環境設定ファイルにクラスパス変数を追加します。
vi /opt/spark/conf/spark-env.sh ~ SPARK_CLASSPATH=/usr/lib/hadoop/lib/gcs-connector-latest-hadoop2.jar ~
Sparkのexecutorプロセスが生きていたらMesosを再起動します。
※Mesosについて、詳しくはこちらの記事をご覧ください(*´∀`*)
SparkをMesosで冗長化させてIPython notebookから弄る
ここまで設定すると、pysparkやspark-shellから下記のようにGoogle Cloud上のファイルを弄れるようになります。
val textFile = sc.textFile("gs://test/access.log") textFile.count() :
4. SparkSQLでもGoogle Cloud使いたい!
上記までの設定では、pysparkやspark-shellでの利用には問題ありませんが、SparkSQLからGoogle Cloudを使うことができません。
SparkSQLがHiveのメタストアを利用しているからです。
そこでHiveの方にも同じ設定を行います。
Hadoop&HiveホストでHadoop 2.x compatible connectorをダウンロードして設置し、
core-site.xmlにGoogle Cloud Strageの情報を追加します。
core-site.xmlに追加する内容は
2. Google Cloud Storage Connector for Hadoopの設定
のものと全く同じです。
1. で発行された秘密鍵も設置します。
設定したらHadoopクラスタを再起動します。
HiveテーブルのLOCATIONにGoogle Cloudを指定する
Hiveで、LOCATIONにGoogle Cloudを指定したテーブルを作成します。
CREATE EXTERNAL TABLE bq_log( id STRING ,time STRING ,ua STRING ,ip STRING ,refer STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 'gs://test/log' ;
これでSparkSQLからbq_logテーブルに対して発行するクエリはGoogle Cloud上のデータが利用されます。
select count(1) num from bq_log ;
特にはまることもなくあっさりつながってしまいびっくりしました。
性能面では、Google Cloud Strage上のファイルを使うとローカルのHDFSに比べて1.3〜1.5倍くらい遅くなりました。
しかし40GB程度のデータで検証してみたところ、Google Cloud Strageのネットワーク速度は非常に優秀で瞬間的に1Gbps出てしまいました( ̄□ ̄;)!!
新しい選択肢の一つとなりそうです。