ScalaでフォルトトレラントなRabbitMQクライアントをつくりました。
sstoneのamqp-clientというライブラリを使っています。
このライブラリでかんたんにフォルトトレラントなRabbitMQクライアントが作れちゃいます。
sstone/amqp-client
https://github.com/sstone/amqp-client
Githubにはサンプルコードもコミットされており、とてもわかりやすいです。
amqp-clientなのですが、内部的にakkaとRabbitMQが公式に提供しているjavaライブラリを使用しています。
作成時点のバージョンは下記です。
- scala: 2.11.0
- akka: 2.3.2
- amqp-client: 1.4
RabbitMQの2台構成クラスタリングが存在しているものとして話をすすめます。
- 192.168.0.101:5672
- 192.168.0.102:5672
ビルドシステム
ビルドシステムはgradleを使いました。
build.gradleは下記のようになっています。
apply plugin: 'scala'
sourceCompatibility = 1.6
targetCompatibility = 1.6
repositories {
mavenCentral()
maven {
url "https://oss.sonatype.org/content/repositories/releases/"
}
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
}
ext {
scalaVersion = '2.11'
scalaVersionRevision = '0'
scalaXmlVersion = 'M4'
akkaVersion = '2.3.2'
logbackVersion = '1.1.2'
amqpVersion = '1.4'
junitVersion = '4.11'
specs2Version = '2.3.11'
mockitoVersion = '1.9.5'
}
configurations {
scalaLibrary
scalaCompiler
}
dependencies {
scalaLibrary "org.scala-lang:scala-library:$scalaVersion.$scalaVersionRevision"
scalaCompiler "org.scala-lang:scala-compiler:$scalaVersion.$scalaVersionRevision"
compile "org.scala-lang:scala-library:$scalaVersion.$scalaVersionRevision"
compile "org.scala-lang:scala-xml:$scalaVersion.$scalaVersionRevision-$scalaXmlVersion"
compile "com.typesafe.akka:akka-osgi_$scalaVersion:$akkaVersion"
compile "com.typesafe.akka:akka-slf4j_$scalaVersion:$akkaVersion"
compile "ch.qos.logback:logback-classic:$logbackVersion"
compile "com.github.sstone:amqp-client_$scalaVersion:$amqpVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
testCompile "org.specs2:specs2_$scalaVersion:$specs2Version"
testCompile "com.typesafe.akka:akka-testkit_$scalaVersion:$akkaVersion"
}
tasks.withType(ScalaCompile) {
scalaCompileOptions.useCompileDaemon = false
scalaCompileOptions.useAnt = false
configure(scalaCompileOptions.forkOptions) {
memoryMaximumSize = '1g'
jvmArgs = ['-XX:MaxPermSize=512m']
}
}
プロデューサー
まずはRabbitMQに対してメッセージを投げるプロデューサー(パブリッシャーという言い方も)を作ります。
package com.inkenkun.x1.rabbitmq
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorLogging}
import com.github.sstone.amqp.{Amqp, ChannelOwner, ConnectionOwner}
import com.github.sstone.amqp.Amqp.{Publish, StandardExchanges}
import com.rabbitmq.client.AMQP
class Producer extends Actor with ActorLogging {
implicit val system = ActorSystem()
implicit val config = system.settings.config
lazy val connection: ConnectionFactory =
ConnectionOwner.buildConnFactory(
host = addresses.head.getHost, // addressesに指定されたノードの先頭をデフォルトのホストとしています。
port = addresses.head.getPort,
vhost = "/",
user = "admin",
password = "admin"
)
lazy val addresses: Array[Address] = Array(
new Address( "192.168.0.101", 5672 ),
new Address( "192.168.0.102", 5672 )
)
lazy val amqpActor = system.actorOf(
ConnectionOwner.props(
connFactory = connection,
reconnectionDelay = 500 millis, // RabbitMQにポーリングする間隔です。
addresses = Some( addresses ) ) )
lazy val producer = ConnectionOwner.createChildActor( amqpActor, ChannelOwner.props() )
def receive: Receive = {
case Producer.Start =>
Amqp.waitForConnection( system, amqpActor, producer ).await( 5, TimeUnit.SECONDS )
sender ! Producer.Ok
case Producer.Standard( key, body ) =>
producer ! Publish(
exchange = StandardExchanges.amqDirect.name,
key = key,
body = body,
properties = Some( new AMQP.BasicProperties.Builder().deliveryMode(2).build() ),
mandatory = true,
immediate = false )
}
}
object Producer {
case object Start
case object Ok
case class Standard( routeKey: String, body: Array[Byte] )
}
Producer.Startメッセージを受け取ったらRabbitMQとの接続を開始し、
Producer.Standardメッセージを受け取ったらRabbitMQにメッセージを投げます。
amqpActorがRabbitMQとの接続状態を監視しています。
コンシューマー
次にコンシューマーを作ってみます。
コンシューマーの方は、まず、メッセージを受け取って動作するActorを作成します。
メッセージを受け取って動作するActor
package com.inkenkun.x1.rabbitmq
import akka.actor.{Actor, ActorLogging}
import com.github.sstone.amqp.Amqp.{Ack, Delivery}
class SampleConsumer extends Actor with ActorLogging {
def receive: Receive = {
case Delivery( consumerTag, envelope, properties, body ) => {
val source = new String( body, "UTF-8" )
log.info( s"receive message: ${source}" )
sender ! Ack( envelope.getDeliveryTag )
}
}
}
この例↑は、受け取ったメッセージをログに出力しているだけです^^;
次に、↑で作成したアクターをコンシューマーとして登録するアクターを作成します(これはアクターじゃなくても良いです)。
コンシューマーを登録するActor
amqp-clientライブラリのConsumerオブジェクトにアクターを登録すると、RabbitMQへポーリングしてメッセージを受け取るようになります。
package com.inkenkun.x1.rabbitmq
import akka.actor.{Actor, ActorLogging, Props}
import com.github.sstone.amqp.{Amqp, ConnectionOwner, Consumer}
import com.github.sstone.amqp.Amqp.{AddBinding, Binding, QueueParameters, StandardExchanges}
class ConsumersStarter extends Actor with ActorLogging {
implicit val system = ActorSystem()
implicit val config = system.settings.config
lazy val connection: ConnectionFactory =
ConnectionOwner.buildConnFactory(
host = addresses.head.getHost, // addressesに指定されたノードの先頭をデフォルトのホストとしています。
port = addresses.head.getPort,
vhost = "/",
user = "admin",
password = "admin"
)
lazy val addresses: Array[Address] = Array(
new Address( "192.168.0.102", 5672 ),
new Address( "192.168.0.101", 5672 )
)
lazy val amqpActor = system.actorOf(
ConnectionOwner.props(
connFactory = connection,
reconnectionDelay = 500 millis, // RabbitMQにポーリングする間隔です。
addresses = Some( addresses ) ) )
def receive: Receive = {
case "sample" =>
val queueParams = queue( "sample-queue" )
val child = ConnectionOwner.createChildActor(
amqpActor,
Consumer.props(
listener = Some( system.actorOf( Props[SampleConsumer], name = "SampleConsumer" ) ),
init = List( AddBinding( Binding( StandardExchanges.amqDirect, queueParams, "route-key" ) ) )
),
name = Some( "Sample" ) )
Amqp.waitForConnection( system, child ).await()
}
def queue( name: String, passive: Boolean = false, durable: Boolean = true, exclusive: Boolean = false, autodelete: Boolean = true ): QueueParameters =
QueueParameters(
name = name,
passive = passive,
durable = durable,
exclusive = exclusive,
autodelete = autodelete )
}
Consumerオブジェクトに、作成したアクター(SampleConsumer)をListenerとして登録しています。
コンシューマーの起動とプロデューサーのパブリッシュ
Mainオブジェクトに書きました。
package com.inkenkun.x1.rabbitmq
import scala.concurrent.duration._
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
object Main extends App {
implicit val system = ActorSystem()
implicit val config = system.settings.config
import system.dispatcher
implicit val timeout: Timeout = 5.second
// コンシューマーを開始して待ち受けます。
val starter = system.actorOf( Props[ConsumersStarter], name="ConsumersStarter" )
starter ! "sample"
// パブリッシャーでRabbitMQにメッセージを送ります。
val producer = system.actorOf( Props[Producer], name="Producer" )
producer ? Producer.Start onSuccess {
case Producer.Ok => producer ! Producer.Standard( "route-key", "test".getBytes )
}
}
コンシューマーの待ち受けを開始した後、プロデューサーからメッセージを投げています。
「while {} loopで待ち受ける」みたいな処理はsstone/amqp-clientが全てやってくれます。
方肺実験
コンシューマーを起動しメッセージの待ち受けを開始します。
秒間500くらいでパブリッシャーを実行し、メッセージングします。
プロデューサーはデフォルトでRabbitMQ1に接続します。
コンシューマーはデフォルトでRabbitMQ2に接続します。
RabbitMQ1を落とす
プロデューサーが接続しているRabbitMQ1を落とします。
プロデューサーはRabbitMQ1が落ちたことを検知してRabbitMQ2にメッセージングすることを期待します。
結果
| # | polling(ms) | 試行数 | key数 | エラー数 | エラー率 |
| 1 | 1,000 | 40,000 | 39,876 | 124 | 0.31% |
| 2 | 500 | 40,000 | 39,982 | 18 | 0.05% |
| 3 | 250 | 40,000 | 39,974 | 26 | 0.07% |
ポーリング間隔を500msくらいにするとエラー率が相当低くなります。
RabbitMQ2を落とす
コンシューマーが接続しているRabbitMQ2を落とします。
コンシューマーはRabbitMQ1が落ちたことを検知してRabbitMQ1からメッセージを受け取ることを期待します。
結果
| # | polling(ms) | 試行数 | key数 | エラー数 | エラー率 |
| 1 | 1,000 | 40,000 | 39,762 | 238 | 0.60% |
| 2 | 500 | 40,000 | 39,936 | 64 | 0.16% |
| 3 | 250 | 40,000 | 39,958 | 42 | 0.11% |
こちらも同じく、ポーリング間隔を500msくらいにするとエラー率が相当低くなります。
まとめ
障害時にこの程度のエラー率であればまずまずといったところでしょう。
しかし、エラーをゼロにするのは難しそうですね。
RabbitMQ間のsync性能(Gossip使ってます)も相当良いですね。
ソースコードは下記にアップしています。
https://github.com/x1-/scala_rabbitmq



