ScalaでフォルトトレラントなRabbitMQクライアントと方肺実験

2014-06-07

ScalaでフォルトトレラントなRabbitMQクライアントをつくりました。
sstoneのamqp-clientというライブラリを使っています。

このライブラリでかんたんにフォルトトレラントなRabbitMQクライアントが作れちゃいます。

sstone/amqp-client

https://github.com/sstone/amqp-client

Githubにはサンプルコードもコミットされており、とてもわかりやすいです。

amqp-clientなのですが、内部的にakkaとRabbitMQが公式に提供しているjavaライブラリを使用しています。

作成時点のバージョンは下記です。

  • scala: 2.11.0
  • akka: 2.3.2
  • amqp-client: 1.4

RabbitMQの2台構成クラスタリングが存在しているものとして話をすすめます。

  • 192.168.0.101:5672
  • 192.168.0.102:5672

ビルドシステム

ビルドシステムはgradleを使いました。
build.gradleは下記のようになっています。

apply plugin: 'scala'

sourceCompatibility = 1.6
targetCompatibility = 1.6

repositories {
  mavenCentral()
  maven {
    url "https://oss.sonatype.org/content/repositories/releases/"
  }
  maven {
    url "https://oss.sonatype.org/content/repositories/snapshots/"
  }
}

ext {
  scalaVersion = '2.11'
  scalaVersionRevision = '0'
  scalaXmlVersion = 'M4'
  akkaVersion = '2.3.2'
  logbackVersion = '1.1.2'
  amqpVersion = '1.4'
  junitVersion = '4.11'
  specs2Version = '2.3.11'
  mockitoVersion = '1.9.5'
}

configurations {
  scalaLibrary
  scalaCompiler
}

dependencies {
  scalaLibrary "org.scala-lang:scala-library:$scalaVersion.$scalaVersionRevision"
  scalaCompiler "org.scala-lang:scala-compiler:$scalaVersion.$scalaVersionRevision"

  compile "org.scala-lang:scala-library:$scalaVersion.$scalaVersionRevision"
  compile "org.scala-lang:scala-xml:$scalaVersion.$scalaVersionRevision-$scalaXmlVersion"
  compile "com.typesafe.akka:akka-osgi_$scalaVersion:$akkaVersion"
  compile "com.typesafe.akka:akka-slf4j_$scalaVersion:$akkaVersion"
  compile "ch.qos.logback:logback-classic:$logbackVersion"
  compile "com.github.sstone:amqp-client_$scalaVersion:$amqpVersion"

  testCompile "junit:junit:$junitVersion"
  testCompile "org.mockito:mockito-core:$mockitoVersion"
  testCompile "org.specs2:specs2_$scalaVersion:$specs2Version"
  testCompile "com.typesafe.akka:akka-testkit_$scalaVersion:$akkaVersion"
}

tasks.withType(ScalaCompile) {
  scalaCompileOptions.useCompileDaemon = false
  scalaCompileOptions.useAnt = false

  configure(scalaCompileOptions.forkOptions) {
    memoryMaximumSize = '1g'
    jvmArgs = ['-XX:MaxPermSize=512m']
  }
}

プロデューサー

まずはRabbitMQに対してメッセージを投げるプロデューサー(パブリッシャーという言い方も)を作ります。

package com.inkenkun.x1.rabbitmq

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorLogging}
import com.github.sstone.amqp.{Amqp, ChannelOwner, ConnectionOwner}
import com.github.sstone.amqp.Amqp.{Publish, StandardExchanges}
import com.rabbitmq.client.AMQP

class Producer extends Actor with ActorLogging {

  implicit val system = ActorSystem()
  implicit val config = system.settings.config

  lazy val connection: ConnectionFactory =
    ConnectionOwner.buildConnFactory(
      host = addresses.head.getHost,  // addressesに指定されたノードの先頭をデフォルトのホストとしています。
      port = addresses.head.getPort,
      vhost = "/",
      user = "admin",
      password = "admin"
    )

  lazy val addresses: Array[Address] = Array(
    new Address( "192.168.0.101", 5672 ),
    new Address( "192.168.0.102", 5672 )
  )

  lazy val amqpActor = system.actorOf(
    ConnectionOwner.props(
      connFactory = connection,
      reconnectionDelay = 500 millis, // RabbitMQにポーリングする間隔です。
      addresses = Some( addresses ) ) )

  lazy val producer = ConnectionOwner.createChildActor( amqpActor, ChannelOwner.props() )

  def receive: Receive = {
    case Producer.Start =>
      Amqp.waitForConnection( system, amqpActor, producer ).await( 5, TimeUnit.SECONDS )
      sender ! Producer.Ok

    case Producer.Standard( key, body ) =>
      producer ! Publish(
        exchange = StandardExchanges.amqDirect.name,
        key = key,
        body = body,
        properties = Some( new AMQP.BasicProperties.Builder().deliveryMode(2).build() ),
        mandatory = true,
        immediate = false )
  }
}

object Producer {
  case object Start
  case object Ok
  case class Standard( routeKey: String, body: Array[Byte] )
}

Producer.Startメッセージを受け取ったらRabbitMQとの接続を開始し、
Producer.Standardメッセージを受け取ったらRabbitMQにメッセージを投げます。
amqpActorがRabbitMQとの接続状態を監視しています。

コンシューマー

次にコンシューマーを作ってみます。
コンシューマーの方は、まず、メッセージを受け取って動作するActorを作成します。

メッセージを受け取って動作するActor

package com.inkenkun.x1.rabbitmq

import akka.actor.{Actor, ActorLogging}
import com.github.sstone.amqp.Amqp.{Ack, Delivery}

class SampleConsumer extends Actor with ActorLogging {

  def receive: Receive = {
    case Delivery( consumerTag, envelope, properties, body ) => {
      val source = new String( body, "UTF-8" )
      log.info( s"receive message: ${source}" )
      sender ! Ack( envelope.getDeliveryTag )
    }
  }
}

この例↑は、受け取ったメッセージをログに出力しているだけです^^;
次に、↑で作成したアクターをコンシューマーとして登録するアクターを作成します(これはアクターじゃなくても良いです)。

コンシューマーを登録するActor

amqp-clientライブラリのConsumerオブジェクトにアクターを登録すると、RabbitMQへポーリングしてメッセージを受け取るようになります。

package com.inkenkun.x1.rabbitmq

import akka.actor.{Actor, ActorLogging, Props}
import com.github.sstone.amqp.{Amqp, ConnectionOwner, Consumer}
import com.github.sstone.amqp.Amqp.{AddBinding, Binding, QueueParameters, StandardExchanges}

class ConsumersStarter extends Actor with ActorLogging {

  implicit val system = ActorSystem()
  implicit val config = system.settings.config

  lazy val connection: ConnectionFactory =
    ConnectionOwner.buildConnFactory(
      host = addresses.head.getHost,  // addressesに指定されたノードの先頭をデフォルトのホストとしています。
      port = addresses.head.getPort,
      vhost = "/",
      user = "admin",
      password = "admin"
    )

  lazy val addresses: Array[Address] = Array(
    new Address( "192.168.0.102", 5672 ),
    new Address( "192.168.0.101", 5672 )
  )

  lazy val amqpActor = system.actorOf(
    ConnectionOwner.props(
      connFactory = connection,
      reconnectionDelay = 500 millis, // RabbitMQにポーリングする間隔です。
      addresses = Some( addresses ) ) )

  def receive: Receive = {

    case "sample" =>
      val queueParams = queue( "sample-queue" )
      val child = ConnectionOwner.createChildActor(
        amqpActor,
        Consumer.props(
          listener = Some( system.actorOf( Props[SampleConsumer], name = "SampleConsumer" ) ),
          init     = List( AddBinding( Binding( StandardExchanges.amqDirect, queueParams, "route-key" ) ) )
        ),
        name = Some( "Sample" ) )

      Amqp.waitForConnection( system, child ).await()
  }

  def queue( name: String, passive: Boolean = false, durable: Boolean = true, exclusive: Boolean = false, autodelete: Boolean = true ): QueueParameters =
    QueueParameters(
      name = name,
      passive = passive,
      durable = durable,
      exclusive = exclusive,
      autodelete = autodelete )
}

Consumerオブジェクトに、作成したアクター(SampleConsumer)をListenerとして登録しています。

コンシューマーの起動とプロデューサーのパブリッシュ

Mainオブジェクトに書きました。

package com.inkenkun.x1.rabbitmq

import scala.concurrent.duration._

import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout

object Main extends App {

  implicit val system = ActorSystem()
  implicit val config = system.settings.config

  import system.dispatcher
  implicit val timeout: Timeout = 5.second

  // コンシューマーを開始して待ち受けます。
  val starter = system.actorOf( Props[ConsumersStarter], name="ConsumersStarter" )
  starter ! "sample"

  // パブリッシャーでRabbitMQにメッセージを送ります。
  val producer = system.actorOf( Props[Producer], name="Producer" )
  producer ? Producer.Start onSuccess {
    case Producer.Ok => producer ! Producer.Standard( "route-key", "test".getBytes )
  }
}

コンシューマーの待ち受けを開始した後、プロデューサーからメッセージを投げています。
「while {} loopで待ち受ける」みたいな処理はsstone/amqp-clientが全てやってくれます。

方肺実験

コンシューマーを起動しメッセージの待ち受けを開始します。
秒間500くらいでパブリッシャーを実行し、メッセージングします。

プロデューサーはデフォルトでRabbitMQ1に接続します。
コンシューマーはデフォルトでRabbitMQ2に接続します。

RabbitMQレプリ方肺実験_1

RabbitMQ1を落とす

RabbitMQレプリ方肺実験_2

プロデューサーが接続しているRabbitMQ1を落とします。
プロデューサーはRabbitMQ1が落ちたことを検知してRabbitMQ2にメッセージングすることを期待します。

結果

# polling(ms) 試行数 key数 エラー数 エラー率
1 1,000 40,000 39,876 124 0.31%
2 500 40,000 39,982 18 0.05%
3 250 40,000 39,974 26 0.07%

ポーリング間隔を500msくらいにするとエラー率が相当低くなります。

RabbitMQ2を落とす

RabbitMQレプリ方肺実験_3

コンシューマーが接続しているRabbitMQ2を落とします。
コンシューマーはRabbitMQ1が落ちたことを検知してRabbitMQ1からメッセージを受け取ることを期待します。

結果

# polling(ms) 試行数 key数 エラー数 エラー率
1 1,000 40,000 39,762 238 0.60%
2 500 40,000 39,936 64 0.16%
3 250 40,000 39,958 42 0.11%

こちらも同じく、ポーリング間隔を500msくらいにするとエラー率が相当低くなります。

まとめ

障害時にこの程度のエラー率であればまずまずといったところでしょう。
しかし、エラーをゼロにするのは難しそうですね。
RabbitMQ間のsync性能(Gossip使ってます)も相当良いですね。

ソースコードは下記にアップしています。
https://github.com/x1-/scala_rabbitmq