Elasticsearchのインデックス登録をRabbitMQ経由でやる

2014-01-07
 

バツイチちゃんがSolrのこと書いてたので、インケンはElasticsearchのこと書きます。

xmlよりjson派なインケンとしてはsolrなんかより、elasticsearch押しです。solr使ったことないけどね!

elasticsearchはRESTリクエスト飛ばせば、検索登録更新削除すべて行うことができるのですが、RabbitMQのプラグインを使うと、登録更新削除をキューにメッセージ投げるだけでいいので、大変便利です。

MQにメッセージ投げる際には、elasticsearchのbulk api形式で送る必要があります。

その辺がわかりづらかったので、サンプル交えてご紹介します。

elasticsearch、rabbitmqプラグインのインストールは割愛します。

elasticsearchにrabbitmq用のインデックスを登録する

まずはelasticsearchにrabbitmqのサーバー情報を設定します。

curl -XPUT 'http://elasticserver:9200/_river/rabbitmq/_meta' -d '{
"type" : "rabbitmq",
"rabbitmq" : {
                "host" : "192.168.0.1",
                "port" : 5672,
                "user" : "guest",
                "pass" : "guest",
                "vhost" : "/",
                "queue" : "elasticsearch",
                "exchange" : "elasticsearch",
                "routing_key" : "elasticsearch",
                "exchange_declare" : true,
                "exchange_type" : "direct",
                "exchange_durable" : true,
                "queue_declare" : true,
                "queue_bind" : true,
                "queue_durable" : true,
                "queue_auto_delete" : false,
                "args" : { "x-ha-policy" : "all" }
  },
"index" : {
                "bulk_size" : 100,
                "bulk_timeout" : "10ms",
                "ordered" : false
}
}'

RabbitMQを冗長化してる場合はこんな感じ

curl -XPUT 'http://elasticserver:9200/_river/rabbitmq/_meta' -d '{
"type" : "rabbitmq",
"rabbitmq" : {
"addresses" : [
            {
                "host" : "rabbitmq-host1",
                "port" : 5672
            },
            {
                "host" : "rabbitmq-host2",
                "port" : 5672
            }
        ],
"user" : "guest",
"pass" : "guest",
......
.....
}'

これでmqにelasticsearchっていうキューができます。

RabbitMQ経由でデータを登録変更削除する

サンプルとして以下のインデックスを想定します。

curl -XPUT 'http://elasticserver:9200/google/photo/_mapping' -d '
 {
 "photo" : {
          "properties" : {
                 "caption" : {"type" : "string"},
                 "upload_datetime" : {"type" : "date", "format" : "yyyy-MM-dd HH:mm:ss"}
                 }
           }
 }'

これにデータを登録する場合、MQにこんな感じでメッセージを投げます。

PHPサンプル

//登録、変更
$message = '{ "index" : { "_index" : "google", "_type" : "photo", "_id" : "'.$photo_id.'" } }' . '\n';
$message .= '{ "photo" : { "caption" : "写真の説明だよ" , "upload_datetime" :"'. date("Y-m-d H:i:s").'" }}' . '\n';
$message .= '{ "create" : { "_index" : "google", "_type" : "photo", "_id" : "'.$photo_id.'" } }';

publish_mq( 'elasticsearch', $message); //MQに投げる処理

登録と変更は同じです。すでにIDが存在すると更新になります。

//削除
$message = '{ "delete" : { "_index" : "google", "_type" : "photo", "_id" : "'.$photo_id.'" } }' . '\n';
$message .= '{ "create" : { "_index" : "google", "_type" : "photo", "_id" : "'.$photo_id.'" } }';

ポイントは、改行です。

JSONでOKなんだと思って、以下のように一行で投げるとエラります。

//これNG
$message = '{ "delete" : { "_index" : "google", "_type" : "photo", "_id" : "'.$photo_id.'" }  ,  "create" : { "_index" : "google", "_type" : "photo", "_id" : "'.$photo_id.'" } }';

そして上記のサンプルの場合、captionってデータに改行コードが入ってるとエラるので、それも除去してあげる必要があります。

いかがでしょうか、もちろんRESTでelasticsearchに直接データをPOSTすることもできますが、すべて一旦MQに投げとくと処理待ちも回避できて良いと思います。