バツイチとインケンのエンジニアブログ
プログラムやプログラムじゃないこと

ことり隊入荷情報をSparkで分析してTableauで可視化する

2015-12-22
書いた人 : バツイチ
カテゴリ : Spark | タグ : Apache Spark, Apache Zeppelin, Tableau

この記事はApache Spark Advent Calendar 2015 22日目の記事です。

完成したTableauダッシュボード。

ことり隊マップ

ことり隊というのはこれ↓のことです。

IMG_5696

ゲームセンターにあるUFOキャッチャーのプライズです。
ゲームセンターにもいろいろ特徴がありまして、ことり隊が置いてあるゲームセンターとそうでないゲームセンターがあります。
ゲームセンターをしらみつぶしに回ってことり隊を探すのも大変なので、Twitterを利用してことり隊の入荷情報をキャッチすることにしました。

1. ゲームセンターのtwitterアカウント

まずは東京都内のゲームセンターをリストアップしてtwitterアカウントを探します。
ゲームセンターのリストは 全国ゲーセン地図Wikiから取得しました。
ゲームセンターの名前からtwitterアカウントを探しました(※ここは手動(;´∀`)です)。

2. ゲームセンターのツイート取得

各ゲームセンターの過去のツイートを全て取得します。
せっかくSparkアドベントカレンダー用の記事なのでここからは全てApache Zeppelin上で操作を行います。

スクリーンショット 2015-12-23 1.59.52

Apache Zeppelin

ツイートの取得にはTwitter4Jを使います。
これはspark-streaming-twitterでも使っているライブラリです。
※Sparkが古いTwitter4Jに依存を持っていて若干大変でした。

%dep
z.addRepo("ATILIKA dependencies").url("http://www.atilika.org/nexus/content/repositories/atilika")
z.load("org.atilika.kuromoji:kuromoji:0.7.7")
z.load("com.databricks:spark-csv_2.10:1.2.0")
z.load("org.twitter4j:twitter4j-core:4.0.4")

import java.util.Date
import java.sql.{Date => SqlDate}
import scala.collection.JavaConverters._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}

import twitter4j.{Paging, Query, Status, TwitterFactory, Twitter}
import twitter4j.conf.ConfigurationBuilder

lazy val factory = new TwitterFactory( config.build() )
lazy val twitter = factory.getInstance()

// 途中省略

val perPage = 200
var total = 0

val tweets = tokyoTweets.map { g =>
    val user  = twitter.showUser( g.screenName)
    val twNum = user.getStatusesCount
    val maxPage = scala.math.ceil( twNum.toDouble / perPage ).toInt

    val tw = retreiveRecursively( twitter, g.twitter, total, maxPage, perPage )
    total += maxPage
    tw
}
val sum = tweets.foldLeft( Seq.empty[Tweet] ){ ( acc, ss ) =>
    acc ++ ss.map { s =>
        val user = s.getUser
        Tweet(
            s.getId,
            s.getText,
            s.getUser.getId,
            s.getUser.getScreenName,
            new SqlDate( s.getCreatedAt.getTime )
        )
    } toSeq
}
sc.parallelize( sum ).toDF.write.format( "json" ).mode( SaveMode.Append ).save( path )

こんな感じで、「ツイートを取得してファイルに保存」を繰り返します。

3. ツイートの選別

取得したツイートには様々なものが含まれていますが私が欲しいのはことり隊に関するものだけなのでSparkの機械学習を使ってツイートの選別を行います。

日本語分かち書き

ツイートはほとんどが日本語なので日本語の分かち書きを行う必要があります。
便利なkuromojiを使ってml.Pipelineにフィットする日本語トークナイザーを作成します。

import java.util.UUID
import scala.collection.JavaConverters._

import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.sql.types.{StringType, ArrayType, DataType}
import org.atilika.kuromoji.Tokenizer

object JapaneseTokenizer {
  lazy val tokenizer = Tokenizer.builder().build()
}
class JapaneseTokenizer( override val uid: String ) extends UnaryTransformer[String, Seq[String], JapaneseTokenizer] {

  lazy val tokenizer = JapaneseTokenizer.tokenizer

  def this() = this( s"stok_${UUID.randomUUID().toString.takeRight(12)}" )

  override protected def createTransformFunc: String => Seq[String] = {
    tokenizer.tokenize( _ ).asScala.map( t => t.getSurfaceForm )
  }

  override protected def validateInputType( inputType: DataType ): Unit = {
    require(inputType == StringType, s"Input type must be string type but got $inputType.")
  }

  override protected def outputDataType: DataType = new ArrayType(StringType, false)
}

このJapaneseTokenizerクラスもApache Zeppelinで作成します。

訓練データの読み込みとロジスティック回帰によることり隊ツイート判定

Apache Zeppelinで事前に準備した訓練データをDataFrameに読み込みます。

val training = sqlContext.read.format( "com.databricks.spark.csv" ).option( "header", "true" ).load( path ).select(
    $"label" cast DoubleType as "label",
    $"text"
)

次に、先ほど作った日本語トークナイザー、ハッシング・トリック、ロジスティック回帰を使ったパイプラインを組み立て、訓練データの学習を行います。

val tokenizer = new JapaneseTokenizer().setInputCol( "text" ).setOutputCol( "words" )
val hashingTF = new HashingTF().setNumFeatures( 1000 ).setInputCol( tokenizer.getOutputCol ).setOutputCol( "features" )
val lr = new LogisticRegression().setMaxIter( 10 ).setRegParam( 0.001 )

val pipeline = new Pipeline().setStages( Array( tokenizer, hashingTF ) )
val model = pipeline.fit( training )

今度は判定したいデータをDataFrameに読み込み、作成したモデルで推定を行います。

val allTweets = sqlContext.read.format( "com.databricks.spark.csv" ).option( "header", "true" ).load( path )
val predict = model.transform( test )
predict.where( $"prediction" === 1.0 ).write.format( "com.databricks.spark.csv" ).option( "header", "true" ).mode( SaveMode.Overwrite ).save( path )

“prediction = 1.0”と判定されたデータのみをことり隊入荷情報として抽出します。
抽出したデータは後ほどTableauでロードするのでCSV形式で保存します。

4. ゲームセンターの位置情報を取得

Tableauにゲームセンターの位置をプロットしたいのですが、Tableauの位置データは15万人以上の市区町村のみです。
see: http://kb.tableau.com/articles/knowledgebase/mapping-basics

よって全ての市区町村の位置を取得するためにGoogle Maps Geocoding APIを使いました。

Google Maps Geocoding API
google-maps-services-java

これもApache Zeppelinから使うことができます。

%dep
z.load("com.databricks:spark-csv_2.10:1.2.0")
z.load("com.google.maps:google-maps-services:0.1.9")

Twitterと同様、Google developer consoleでAPIキーを取得する必要があります。

import scala.collection.JavaConverters._
import com.google.maps.{GeoApiContext, GeocodingApi}
import com.google.maps.model.GeocodingResult

val context = new GeoApiContext().setApiKey( key )

val tokyo = sqlContext.read.format( "com.databricks.spark.csv" ).option( "header", "true" ).load( path )
val tokyoList = tokyo.collectAsList.asScala
import org.apache.spark.sql.Row

val tokyoGeo = tokyoList.map { case Row( id: String, shop: String, twitter: String, prefecture: String, city: String, address1: String, address2: String ) =>
    val results =  GeocodingApi.geocode(
        context,
        s"${prefecture} ${city}"
    ).await()
    val loc = results(0).geometry.location
    GameCentre(
        id.toInt,
        shop,
        twitter,
        prefecture,
        city,
        address1,
        address2,
        loc.lat,
        loc.lng
    )
}

tokyoGeo.toDF.write.format( "com.databricks.spark.csv" ).option( "header", "false" ).mode( org.apache.spark.sql.SaveMode.Overwrite ).save( path )

この位置情報付きのゲームセンター情報もCSV形式で保存します。

5. Tableauでゲームセンター一覧と入荷情報を表示する

ファイルは全てCSVで作成したのであとはTableauに読み込むだけです。
Tableau上で位置情報付きゲームセンターCSVとことり隊ツイートCSVをロードし、screenNameをキーとして外部結合します。

ことり隊マップ

Apache Zeppelin + Sparkは、javaのライブラリがいろいろ使えるので便利です。
可視化以外はApache Zeppelinのノートブック上で完結できましたb
Sparkで分析した結果はいろいろなフォーマットで出力できるので連携が楽です。

このエントリーをはてなブックマークに追加
Tweet

← Druid part1 インストールとチュートリアル
Druid part2 pivotでtwitterデータを可視化してみる →

 

最近書いた記事

  • Ryzen7 3800XT でmini ITXオープンフレームPCを作る
  • Pythonで機械学習入門 競馬予測
  • HP ENVY 15 クリエイターモデルレビューとRAID0解除
  • JRA-VAN データラボを使って、競馬データを収集する
  • Surface Pro 3 にubuntu18.04を入れる

カテゴリー

  • Android
  • Apache Flink
  • API
  • AWS
  • bazel
  • BigQuery
  • Cassandra
  • Docker
  • Druid
  • Elasticsearch
  • Git
  • Golang
  • gradle
  • HDFS
  • JavaScript
  • jvm
  • Linux
  • MongoDB
  • MySQL
  • Nginx
  • Nodejs
  • PaaS
  • PHP
  • Python
  • RabbitMQ
  • Raspberry Pi
  • React Native
  • Redis
  • Riak
  • rust
  • scala
  • Scheme
  • SEO
  • solr
  • Spark
  • spray
  • Sublime Text
  • Swift
  • Tableau
  • Unity
  • WebIDE
  • Wordpress
  • Youtube
  • ひとこと
  • カンファレンス
  • スケジューラ
  • マイクロマウス
  • 広告
  • 技術じゃないやつ
  • 株
  • 機械学習
  • 競馬
  • 自作キーボード
  • 自然言語処理

アーカイブ

  • 2021年4月
  • 2021年2月
  • 2021年1月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年10月
  • 2019年9月
  • 2019年8月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年11月
  • 2018年9月
  • 2018年5月
  • 2018年3月
  • 2018年2月
  • 2017年9月
  • 2017年8月
  • 2017年6月
  • 2017年4月
  • 2017年3月
  • 2017年1月
  • 2016年10月
  • 2016年9月
  • 2016年8月
  • 2016年6月
  • 2016年5月
  • 2016年4月
  • 2016年3月
  • 2016年2月
  • 2016年1月
  • 2015年12月
  • 2015年11月
  • 2015年10月
  • 2015年9月
  • 2015年8月
  • 2015年6月
  • 2015年5月
  • 2015年2月
  • 2015年1月
  • 2014年12月
  • 2014年11月
  • 2014年9月
  • 2014年6月
  • 2014年5月
  • 2014年3月
  • 2014年2月
  • 2014年1月
  • 2013年12月
  • 2013年11月
  • 2013年10月
  • 2013年9月
  • 2013年8月

書いた人

  • バツイチちゃん
  • インケンくん

このブログについて

エンジニアとしての考え方が間逆な2人がしょーもないこと書いてます。

バツイチ

アイコン

IT業界で働くエンジニアです。名前の通りバツイチです。
理論や抽象的概念が好きだけど人に説明するのが下手。

インケン

アイコン

バツイチちゃんと同じ業界で働いています。
理論とか開発手法とかは正直どうでもよくて、
生活する上で役に立つことに使いたい

Copyright 2025 バツイチとインケンのエンジニアブログ