2018年3月16日(金)に行われたScalaMatsuri2018トレーニングDAYでビッグデータについてお話させて頂きました。
ビッグデータで弱小卓球部を強くしよう!というふざけた内容です。
行ったデモの内容
ストリーミング・データ
Apache Zeppelinでデータの取得〜可視化まで行います
依存のロード
%spark.dep z.reset() z.load("org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0") z.load("mysql:mysql-connector-java:6.0.6")
バージョン確認
%spark scala.util.Properties.versionString spark.version
JSONをパースする構造体をcase classで作成
%spark import org.apache.spark.sql.{Encoder, Encoders} case class Track ( id: Long, player: String, speed: Double, x: Double, y: Double, z: Double, time: java.sql.Timestamp )
作成したcase classからEncoderを作成する
%spark // スキーマを取得することができたり、json.read時に文字列をパースすることができます。 val encoder = Encoders.product[Track] print(encoder)
kafkaからリアルタイムデータを取得して、JSONをDataSetに展開します
%spark val ds = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "10.146.0.3:9092,10.146.0.5:9092,10.146.0.6:9092") .option("startingOffsets", "latest") .option("subscribe", "takkyu") .load() .select($"value" cast "string" as "json") .select(from_json($"json", encoder.schema) as "takkyu") .select("takkyu.*")
Kafkaから取得したデータをメモリに書き出し続けます
%spark val query = ds .writeStream .queryName("takkyu_table") .format("memory") .start()
メモリに展開したデータを抽出します
%sql select * from takkyu_table limit 1000
MySQLから部員情報(マスタ)を取得します
%spark spark.read .format("jdbc") .option("url", "jdbc:mysql://10.146.0.11:3306/takkyu?useUnicode=true&characterEncoding=utf8") .option("dbtable", "buin") .option("user", "******") .option("password", "******") .load() .select("*") .createOrReplaceTempView("buin")
MySQLから取得したマスタとKafkaから取得したストリーム・データをJOINします
%sql select t.player, b.kikiude, t.speed, t.x, t.y, t.z from takkyu_table as t left join buin as b on b.name = t.player order by t.id desc
例えば、利き腕別のスピード平均をグラフ化したりできます。