以前Druidとpivotを使って、twitterデータを可視化というのをやりましたが、Druidやpivotが一緒にパッケージ化されインストールしやすくなったimplyを使って、もっと簡単にリアルタイム集計をしてみたいと思います。
implyは以下のツールがセットになったイベント集計プラットフォームです。
DruidをベースにBIツールのPivotや、DruidのdatasourceをSQLライクに記述できるPlyQL、ラージデータセットを扱う時に便利なJavaScript libraryのPlywoodが含まれています。
implyにはQuickstart用の設定が入ってるので、簡単に試すことができます。
今回はimplyを入れたサーバーとは別のkafkaからデータを取ってきて、pivotで表示するところをこちらのチュートリアルをベースにやっていきたいと思います。
kafkaの準備
まずは、外部のkafkaサーバーを用意し、そこにFluentdとかでJSONで以下のようなデータを入れます。
cd /opt/kafka_2.11/
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sample --from-beginning
{"date":"2017-04-05 06:23:41","ua_name":"Chrome","ua_category":"smartphone","os":"Android","os_version":"5.0","browser_type":"browser","browser_version":"57.0.2987.110"}
{"date":"2017-04-05 06:31:48","ua_name":"Chrome","ua_category":"smartphone","os":"Android","os_version":"5.0","browser_type":"browser","browser_version":"57.0.2987.110"}
{"date":"2017-04-05 07:42:18","ua_name":"Safari","ua_category":"smartphone","os":"iPhone","os_version":"9.1","browser_type":"browser","browser_version":"9.0"}
日時はUTCで入れます。ローカルタイムで入れるとdruidから弾かれます。
implyの設定
kafkaが用意できたらimplyの設定をしていきます。
まずはダウンロードしてきて解凍
tar -xzf imply-2.0.0.tar.gz
cd imply-2.0.0
次にdruidのquickstartのkafka設定ファイルを修正します。
vi imply-2.0.0/conf-quickstart/tranquility/kafka.json
一番下にある以下の行をコメントを外します
!p95 tranquility-kafka bin/tranquility kafka -configFile conf-quickstart/tranquility/kafka.json
上のコメント外したファイルにdatasourceの設定をします。
中味はこんな感じにしました。
vi conf-quickstart/tranquility/kafka.json
{
"dataSources" : [
{
"spec" : {
"dataSchema" : {
"dataSource" : "sample",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"dimensionsSpec" : {
"dimensions" : [
"date",
"ua_name",
"ua_category" ,
"os",
"os_version",
"browser_type",
"browser_version"
]
},
"format" : "json"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "none"
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
}
]
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "sample"
}
}
],
"properties" : {
"zookeeper.connect" : "localhost:2181",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "kafka-server:2181",
"kafka.group.id" : "kafka",
"serialization.format" : "smile",
"druidBeam.taskLocator": "overlord"
}
}
timestampSpecにjsonのdateの形式を入れて、dimensionsにkafkaに入れたjsonの項目名を入れてます。
topicPatternにはkafkaのtopic名、kafka.zookeeper.connectに上で設定したkafkaサーバーのアドレスを入れます。
とりあえずcountだけ計測するようにmetricsSpecを設定します。
implyの起動
以下のコマンドでdruidやpivotなどが一括で起動します。
bin/supervise -c conf/supervise/quickstart.conf
起動したらkafkaにデータを流してみましょう。
tail -f var/sv/tranquility-kafka.log
こんな感じのログが出ると思います。
2017-04-05 08:38:40,721 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer – Flushed {dogakun={receivedCount=2, sentCount=2, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 2ms.
上記みたいにsentCountに値がセットされていたら正常にdruidに取り込まれています。
droppedCountが0じゃない場合、何らかの問題があってdruidに取り込まれていません。
jsonのtimestamp(今回の場合だとdate)がUTCじゃない場合やテストデータなどでdruidのサーバーと時間がかけ離れてる場合にdropされたりします。
PlyQLでデータソースを確認
ちゃんとsentCountされた場合、PlyQLでデータが確認できるようになります。
bin/plyql -h localhost:8082 -q 'show tables'
┌────────────────────────────┐
│ Tables_in_database │
├────────────────────────────┤
│ COLUMNS │
│ SCHEMATA │
│ TABLES │
│ sample │
└────────────────────────────┘
bin/plyql -h localhost:8082 -q 'select * from sample'
┌─────────────────────────────────────────┬──────────────┬─────────────────┬───────┬───────────────┬─────────────┬─────────┬────────────┬─────────┬─────────────┬─────────┬────────┐
│ __time │ browser_type │ browser_version │ count │ date │ ip │ os │ os_version │ referer │ ua_category │ ua_name │ vendor │
├─────────────────────────────────────────┼──────────────┼─────────────────┼───────┼───────────────┼─────────────┼─────────┼────────────┼─────────┼─────────────┼─────────┼────────┤
│ Wed Apr 05 2017 17:52:45 GMT+0900 (JST) │ browser │ 9.0 │ 1 │ 1491382365000 │ 10.48.12.29 │ iPhone │ 9.1 │ NULL │ smartphone │ Safari │ Apple │
│ Wed Apr 05 2017 17:52:47 GMT+0900 (JST) │ browser │ 9.0 │ 1 │ 1491382367000 │ 10.48.12.29 │ iPhone │ 9.1 │ NULL │ smartphone │ Safari │ Apple │
│ Wed Apr 05 2017 17:52:51 GMT+0900 (JST) │ browser │ 57.0.2987.110 │ 1 │ 1491382371000 │ 10.48.12.29 │ Android │ 5.0 │ NULL │ smartphone │ Chrome │ Google │
│ Wed Apr 05 2017 17:52:54 GMT+0900 (JST) │ browser │ 57.0.2987.110 │ 1 │ 1491382374000 │ 10.48.12.29 │ Android │ 5.0 │ NULL │ smartphone │ Chrome │ Google │
│ Wed Apr 05 2017 17:52:57 GMT+0900 (JST) │ browser │ 9.0 │ 1 │ 1491382377000 │ 10.48.12.29 │ iPad │ 9.1 │ NULL │ smartphone │ Safari │ Apple │
└─────────────────────────────────────────┴──────────────┴─────────────────┴───────┴───────────────┴─────────────┴─────────┴────────────┴─────────┴─────────────┴─────────┴────────┘
こんな感じでSQLライクにデータを参照することができます。
Pivotの設定
imply入れたサーバーの9095ポートにアクセスします。
真ん中のcubes settingsまたは右上の設定アイコンからData Cubesを選んで、新しいdata cubeを作ります。
sourceにデータソース名を選択し、Timezoneは日本がないので Asia/Seoulを選択します。
Create cubeすると、data cubeが作られます。
今作ったdata cubeを選択するとこんな感じでCountが表示されます。
あとはディメンションを設定したりしてこんなグラフが作れます。
こんな感じで、implyとしてまとまったパッケージになったおかげで、別々に入れていたときよりも簡単に試せるようになりました。