imply を使ってリアルタイム集計

2017-04-05

以前Druidとpivotを使って、twitterデータを可視化というのをやりましたが、Druidやpivotが一緒にパッケージ化されインストールしやすくなったimplyを使って、もっと簡単にリアルタイム集計をしてみたいと思います。

implyは以下のツールがセットになったイベント集計プラットフォームです。

DruidをベースにBIツールのPivotや、DruidのdatasourceをSQLライクに記述できるPlyQL、ラージデータセットを扱う時に便利なJavaScript libraryのPlywoodが含まれています。

implyにはQuickstart用の設定が入ってるので、簡単に試すことができます。

今回はimplyを入れたサーバーとは別のkafkaからデータを取ってきて、pivotで表示するところをこちらのチュートリアルをベースにやっていきたいと思います。

kafkaの準備

まずは、外部のkafkaサーバーを用意し、そこにFluentdとかでJSONで以下のようなデータを入れます。

cd /opt/kafka_2.11/
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sample --from-beginning

{"date":"2017-04-05 06:23:41","ua_name":"Chrome","ua_category":"smartphone","os":"Android","os_version":"5.0","browser_type":"browser","browser_version":"57.0.2987.110"}
{"date":"2017-04-05 06:31:48","ua_name":"Chrome","ua_category":"smartphone","os":"Android","os_version":"5.0","browser_type":"browser","browser_version":"57.0.2987.110"}
{"date":"2017-04-05 07:42:18","ua_name":"Safari","ua_category":"smartphone","os":"iPhone","os_version":"9.1","browser_type":"browser","browser_version":"9.0"}

日時はUTCで入れます。ローカルタイムで入れるとdruidから弾かれます。

implyの設定

kafkaが用意できたらimplyの設定をしていきます。

まずはダウンロードしてきて解凍

tar -xzf imply-2.0.0.tar.gz
cd imply-2.0.0

次にdruidのquickstartのkafka設定ファイルを修正します。

vi imply-2.0.0/conf-quickstart/tranquility/kafka.json 

一番下にある以下の行をコメントを外します 
!p95 tranquility-kafka bin/tranquility kafka -configFile conf-quickstart/tranquility/kafka.json

上のコメント外したファイルにdatasourceの設定をします。

中味はこんな感じにしました。

vi conf-quickstart/tranquility/kafka.json

{
  "dataSources" : [
    {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "sample",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "date",
                "format" : "yyyy-MM-dd HH:mm:ss"
              },
              "dimensionsSpec" : {
                "dimensions" : [
                                "date",
                                "ua_name",
                                "ua_category" ,
                                "os",
                                "os_version",
                                "browser_type",
                                "browser_version"
                               ]
              },
              "format" : "json"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "hour",
            "queryGranularity" : "none"
          },
          "metricsSpec" : [
            {
              "type" : "count",
              "name" : "count"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "sample"
      }
    }
  ],
"properties" : {
    "zookeeper.connect" : "localhost:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "kafka-server:2181",
    "kafka.group.id" : "kafka",
    "serialization.format" : "smile",
    "druidBeam.taskLocator": "overlord"
  }
}

timestampSpecにjsonのdateの形式を入れて、dimensionsにkafkaに入れたjsonの項目名を入れてます。
topicPatternにはkafkaのtopic名、kafka.zookeeper.connectに上で設定したkafkaサーバーのアドレスを入れます。

とりあえずcountだけ計測するようにmetricsSpecを設定します。

implyの起動

以下のコマンドでdruidやpivotなどが一括で起動します。

bin/supervise -c conf/supervise/quickstart.conf

起動したらkafkaにデータを流してみましょう。

tail -f var/sv/tranquility-kafka.log

こんな感じのログが出ると思います。

2017-04-05 08:38:40,721 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer – Flushed {dogakun={receivedCount=2, sentCount=2, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 2ms.

上記みたいにsentCountに値がセットされていたら正常にdruidに取り込まれています。

droppedCountが0じゃない場合、何らかの問題があってdruidに取り込まれていません。

jsonのtimestamp(今回の場合だとdate)がUTCじゃない場合やテストデータなどでdruidのサーバーと時間がかけ離れてる場合にdropされたりします。

PlyQLでデータソースを確認

ちゃんとsentCountされた場合、PlyQLでデータが確認できるようになります。

bin/plyql -h localhost:8082 -q 'show tables'

┌────────────────────────────┐
│ Tables_in_database         │
├────────────────────────────┤
│ COLUMNS                    │
│ SCHEMATA                   │
│ TABLES                     │
│ sample                     │
└────────────────────────────┘

bin/plyql -h localhost:8082 -q 'select * from sample'

┌─────────────────────────────────────────┬──────────────┬─────────────────┬───────┬───────────────┬─────────────┬─────────┬────────────┬─────────┬─────────────┬─────────┬────────┐
│ __time                                  │ browser_type │ browser_version │ count │ date          │ ip          │ os      │ os_version │ referer │ ua_category │ ua_name │ vendor │
├─────────────────────────────────────────┼──────────────┼─────────────────┼───────┼───────────────┼─────────────┼─────────┼────────────┼─────────┼─────────────┼─────────┼────────┤
│ Wed Apr 05 2017 17:52:45 GMT+0900 (JST) │ browser      │ 9.0             │ 1     │ 1491382365000 │ 10.48.12.29 │ iPhone  │ 9.1        │ NULL    │ smartphone  │ Safari  │ Apple  │
│ Wed Apr 05 2017 17:52:47 GMT+0900 (JST) │ browser      │ 9.0             │ 1     │ 1491382367000 │ 10.48.12.29 │ iPhone  │ 9.1        │ NULL    │ smartphone  │ Safari  │ Apple  │
│ Wed Apr 05 2017 17:52:51 GMT+0900 (JST) │ browser      │ 57.0.2987.110   │ 1     │ 1491382371000 │ 10.48.12.29 │ Android │ 5.0        │ NULL    │ smartphone  │ Chrome  │ Google │
│ Wed Apr 05 2017 17:52:54 GMT+0900 (JST) │ browser      │ 57.0.2987.110   │ 1     │ 1491382374000 │ 10.48.12.29 │ Android │ 5.0        │ NULL    │ smartphone  │ Chrome  │ Google │
│ Wed Apr 05 2017 17:52:57 GMT+0900 (JST) │ browser      │ 9.0             │ 1     │ 1491382377000 │ 10.48.12.29 │ iPad    │ 9.1        │ NULL    │ smartphone  │ Safari  │ Apple  │
└─────────────────────────────────────────┴──────────────┴─────────────────┴───────┴───────────────┴─────────────┴─────────┴────────────┴─────────┴─────────────┴─────────┴────────┘

こんな感じでSQLライクにデータを参照することができます。

Pivotの設定

imply入れたサーバーの9095ポートにアクセスします。

真ん中のcubes settingsまたは右上の設定アイコンからData Cubesを選んで、新しいdata cubeを作ります。



sourceにデータソース名を選択し、Timezoneは日本がないので Asia/Seoulを選択します。

Create cubeすると、data cubeが作られます。

今作ったdata cubeを選択するとこんな感じでCountが表示されます。

あとはディメンションを設定したりしてこんなグラフが作れます。

 

こんな感じで、implyとしてまとまったパッケージになったおかげで、別々に入れていたときよりも簡単に試せるようになりました。