ScalaでフォルトトレラントなRabbitMQクライアントをつくりました。
sstoneのamqp-clientというライブラリを使っています。
このライブラリでかんたんにフォルトトレラントなRabbitMQクライアントが作れちゃいます。
sstone/amqp-client
https://github.com/sstone/amqp-client
Githubにはサンプルコードもコミットされており、とてもわかりやすいです。
amqp-clientなのですが、内部的にakkaとRabbitMQが公式に提供しているjavaライブラリを使用しています。
作成時点のバージョンは下記です。
- scala: 2.11.0
- akka: 2.3.2
- amqp-client: 1.4
RabbitMQの2台構成クラスタリングが存在しているものとして話をすすめます。
- 192.168.0.101:5672
- 192.168.0.102:5672
ビルドシステム
ビルドシステムはgradleを使いました。
build.gradleは下記のようになっています。
apply plugin: 'scala' sourceCompatibility = 1.6 targetCompatibility = 1.6 repositories { mavenCentral() maven { url "https://oss.sonatype.org/content/repositories/releases/" } maven { url "https://oss.sonatype.org/content/repositories/snapshots/" } } ext { scalaVersion = '2.11' scalaVersionRevision = '0' scalaXmlVersion = 'M4' akkaVersion = '2.3.2' logbackVersion = '1.1.2' amqpVersion = '1.4' junitVersion = '4.11' specs2Version = '2.3.11' mockitoVersion = '1.9.5' } configurations { scalaLibrary scalaCompiler } dependencies { scalaLibrary "org.scala-lang:scala-library:$scalaVersion.$scalaVersionRevision" scalaCompiler "org.scala-lang:scala-compiler:$scalaVersion.$scalaVersionRevision" compile "org.scala-lang:scala-library:$scalaVersion.$scalaVersionRevision" compile "org.scala-lang:scala-xml:$scalaVersion.$scalaVersionRevision-$scalaXmlVersion" compile "com.typesafe.akka:akka-osgi_$scalaVersion:$akkaVersion" compile "com.typesafe.akka:akka-slf4j_$scalaVersion:$akkaVersion" compile "ch.qos.logback:logback-classic:$logbackVersion" compile "com.github.sstone:amqp-client_$scalaVersion:$amqpVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" testCompile "org.specs2:specs2_$scalaVersion:$specs2Version" testCompile "com.typesafe.akka:akka-testkit_$scalaVersion:$akkaVersion" } tasks.withType(ScalaCompile) { scalaCompileOptions.useCompileDaemon = false scalaCompileOptions.useAnt = false configure(scalaCompileOptions.forkOptions) { memoryMaximumSize = '1g' jvmArgs = ['-XX:MaxPermSize=512m'] } }
プロデューサー
まずはRabbitMQに対してメッセージを投げるプロデューサー(パブリッシャーという言い方も)を作ります。
package com.inkenkun.x1.rabbitmq import java.util.concurrent.TimeUnit import akka.actor.{Actor, ActorLogging} import com.github.sstone.amqp.{Amqp, ChannelOwner, ConnectionOwner} import com.github.sstone.amqp.Amqp.{Publish, StandardExchanges} import com.rabbitmq.client.AMQP class Producer extends Actor with ActorLogging { implicit val system = ActorSystem() implicit val config = system.settings.config lazy val connection: ConnectionFactory = ConnectionOwner.buildConnFactory( host = addresses.head.getHost, // addressesに指定されたノードの先頭をデフォルトのホストとしています。 port = addresses.head.getPort, vhost = "/", user = "admin", password = "admin" ) lazy val addresses: Array[Address] = Array( new Address( "192.168.0.101", 5672 ), new Address( "192.168.0.102", 5672 ) ) lazy val amqpActor = system.actorOf( ConnectionOwner.props( connFactory = connection, reconnectionDelay = 500 millis, // RabbitMQにポーリングする間隔です。 addresses = Some( addresses ) ) ) lazy val producer = ConnectionOwner.createChildActor( amqpActor, ChannelOwner.props() ) def receive: Receive = { case Producer.Start => Amqp.waitForConnection( system, amqpActor, producer ).await( 5, TimeUnit.SECONDS ) sender ! Producer.Ok case Producer.Standard( key, body ) => producer ! Publish( exchange = StandardExchanges.amqDirect.name, key = key, body = body, properties = Some( new AMQP.BasicProperties.Builder().deliveryMode(2).build() ), mandatory = true, immediate = false ) } } object Producer { case object Start case object Ok case class Standard( routeKey: String, body: Array[Byte] ) }
Producer.Startメッセージを受け取ったらRabbitMQとの接続を開始し、
Producer.Standardメッセージを受け取ったらRabbitMQにメッセージを投げます。
amqpActorがRabbitMQとの接続状態を監視しています。
コンシューマー
次にコンシューマーを作ってみます。
コンシューマーの方は、まず、メッセージを受け取って動作するActorを作成します。
メッセージを受け取って動作するActor
package com.inkenkun.x1.rabbitmq import akka.actor.{Actor, ActorLogging} import com.github.sstone.amqp.Amqp.{Ack, Delivery} class SampleConsumer extends Actor with ActorLogging { def receive: Receive = { case Delivery( consumerTag, envelope, properties, body ) => { val source = new String( body, "UTF-8" ) log.info( s"receive message: ${source}" ) sender ! Ack( envelope.getDeliveryTag ) } } }
この例↑は、受け取ったメッセージをログに出力しているだけです^^;
次に、↑で作成したアクターをコンシューマーとして登録するアクターを作成します(これはアクターじゃなくても良いです)。
コンシューマーを登録するActor
amqp-clientライブラリのConsumerオブジェクトにアクターを登録すると、RabbitMQへポーリングしてメッセージを受け取るようになります。
package com.inkenkun.x1.rabbitmq import akka.actor.{Actor, ActorLogging, Props} import com.github.sstone.amqp.{Amqp, ConnectionOwner, Consumer} import com.github.sstone.amqp.Amqp.{AddBinding, Binding, QueueParameters, StandardExchanges} class ConsumersStarter extends Actor with ActorLogging { implicit val system = ActorSystem() implicit val config = system.settings.config lazy val connection: ConnectionFactory = ConnectionOwner.buildConnFactory( host = addresses.head.getHost, // addressesに指定されたノードの先頭をデフォルトのホストとしています。 port = addresses.head.getPort, vhost = "/", user = "admin", password = "admin" ) lazy val addresses: Array[Address] = Array( new Address( "192.168.0.102", 5672 ), new Address( "192.168.0.101", 5672 ) ) lazy val amqpActor = system.actorOf( ConnectionOwner.props( connFactory = connection, reconnectionDelay = 500 millis, // RabbitMQにポーリングする間隔です。 addresses = Some( addresses ) ) ) def receive: Receive = { case "sample" => val queueParams = queue( "sample-queue" ) val child = ConnectionOwner.createChildActor( amqpActor, Consumer.props( listener = Some( system.actorOf( Props[SampleConsumer], name = "SampleConsumer" ) ), init = List( AddBinding( Binding( StandardExchanges.amqDirect, queueParams, "route-key" ) ) ) ), name = Some( "Sample" ) ) Amqp.waitForConnection( system, child ).await() } def queue( name: String, passive: Boolean = false, durable: Boolean = true, exclusive: Boolean = false, autodelete: Boolean = true ): QueueParameters = QueueParameters( name = name, passive = passive, durable = durable, exclusive = exclusive, autodelete = autodelete ) }
Consumerオブジェクトに、作成したアクター(SampleConsumer)をListenerとして登録しています。
コンシューマーの起動とプロデューサーのパブリッシュ
Mainオブジェクトに書きました。
package com.inkenkun.x1.rabbitmq import scala.concurrent.duration._ import akka.actor.Props import akka.pattern.ask import akka.util.Timeout object Main extends App { implicit val system = ActorSystem() implicit val config = system.settings.config import system.dispatcher implicit val timeout: Timeout = 5.second // コンシューマーを開始して待ち受けます。 val starter = system.actorOf( Props[ConsumersStarter], name="ConsumersStarter" ) starter ! "sample" // パブリッシャーでRabbitMQにメッセージを送ります。 val producer = system.actorOf( Props[Producer], name="Producer" ) producer ? Producer.Start onSuccess { case Producer.Ok => producer ! Producer.Standard( "route-key", "test".getBytes ) } }
コンシューマーの待ち受けを開始した後、プロデューサーからメッセージを投げています。
「while {} loopで待ち受ける」みたいな処理はsstone/amqp-clientが全てやってくれます。
方肺実験
コンシューマーを起動しメッセージの待ち受けを開始します。
秒間500くらいでパブリッシャーを実行し、メッセージングします。
プロデューサーはデフォルトでRabbitMQ1に接続します。
コンシューマーはデフォルトでRabbitMQ2に接続します。
RabbitMQ1を落とす
プロデューサーが接続しているRabbitMQ1を落とします。
プロデューサーはRabbitMQ1が落ちたことを検知してRabbitMQ2にメッセージングすることを期待します。
結果
# | polling(ms) | 試行数 | key数 | エラー数 | エラー率 |
1 | 1,000 | 40,000 | 39,876 | 124 | 0.31% |
2 | 500 | 40,000 | 39,982 | 18 | 0.05% |
3 | 250 | 40,000 | 39,974 | 26 | 0.07% |
ポーリング間隔を500msくらいにするとエラー率が相当低くなります。
RabbitMQ2を落とす
コンシューマーが接続しているRabbitMQ2を落とします。
コンシューマーはRabbitMQ1が落ちたことを検知してRabbitMQ1からメッセージを受け取ることを期待します。
結果
# | polling(ms) | 試行数 | key数 | エラー数 | エラー率 |
1 | 1,000 | 40,000 | 39,762 | 238 | 0.60% |
2 | 500 | 40,000 | 39,936 | 64 | 0.16% |
3 | 250 | 40,000 | 39,958 | 42 | 0.11% |
こちらも同じく、ポーリング間隔を500msくらいにするとエラー率が相当低くなります。
まとめ
障害時にこの程度のエラー率であればまずまずといったところでしょう。
しかし、エラーをゼロにするのは難しそうですね。
RabbitMQ間のsync性能(Gossip使ってます)も相当良いですね。
ソースコードは下記にアップしています。
https://github.com/x1-/scala_rabbitmq