Scalaで並列処理(1)

2014-03-13

Scalaの並列処理のやり方はバージョンによって大きく異なっています。
この辺りScalaのコミッタの間で紆余曲折があったのではないかと想像してしまいます。

2013/3にリリースされたScala2.10からFutureとPromiseというモナディックなオブジェクトを使って並列処理を行うのがデファクトとなりました。
※わたしはFutureを見たとき、Scalaの今までの並列処理実装の中で最も美しい形だと思いました。

このFutureの使い方は下記の公式ドキュメントに詳しく載っているのですが、実際に使ってみるといろいろな発見があったので記事として残すことにしました。

1. Futureの基本的な使い方

1〜50までの数字の中から任意の5個を選ぶ。
5個の数値を足した合計が奇数か偶数か判定する。
偶数がいくつあるかを戻り値とする。
・・という処理を非同期で行います。
  def standard1: Unit = {

    val stime = System.currentTimeMillis()

    // Futureを返すfutureメソッドを呼び出します。
    val fs = future {
      ( 1 to n ).combinations( r ).count( _.sum % 2 == 0 )
    }

    // Futureの完了。Futureは値もしくは例外を持つとき、完了したことになります。
    fs onComplete {
      case Failure(e) => println( s"error: ${e.getMessage}" )
      case Success(r) => println( s"even number: ${r}" )

      println( s"onComplete:${System.currentTimeMillis()-stime} ms" )
    }

    // 下記のように書き直すこともできます。
//    fs onSuccess {
//      case r => println( s"even number: ${r}" )
//    }
//    fs onFailure {
//      case e => println( s"error: ${e.getMessage}" )
//    }

    println( s"before await:${System.currentTimeMillis()-stime} ms" )

    val res = Await.result( fs, Duration.Inf )
    println( s"Await.result:${res}" )
  }

これを実行するとconsoleには下記のように出力されます。

before await:271 ms
c:1,2,3,4,5, isEven:false
c:1,2,3,4,6, isEven:true
   :
c:95,97,98,99,100, isEven:false
c:96,97,98,99,100, isEven:true
Await.result:1059380
even number: 1059380
onComplete:3128 ms

ん・・・まあ、非同期にはしましたが、特に平行処理は行っていないですからねえ・・・普通に時間かかります。

2. 見通しの悪いコールバックチェイン。

1〜50までの数字の中から任意の5個を選ぶ。
5個の数値を足した合計が奇数か偶数か判定し、偶数の数を数える。(非同期で行う1番目のブロック)
最終的に算出された偶数の値を割り切れなくなるまで2で割り、割った回数を返却する。(非同期で行う2番目のブロック)
・・という処理を実装します。
  def callback1: Unit = {

    val stime = System.currentTimeMillis()

    // 2-1の処理を行うFuture。
    val f21 = future {
      println( s"f21 start:${System.currentTimeMillis()-stime} ms" )
      ( 1 to n ).combinations( r ).count( _.sum % 2 == 0 )
    }

    // 2-1が失敗したら2-2の処理は行いません。
    f21 onFailure {
      case e => println( s"error: ${e.getMessage}" )
    }

    // 2-1が成功したら2-2の処理を行います。
    f21 onSuccess {
      case r => {

        println( s"onSuccess:${System.currentTimeMillis()-stime} ms" )
        println( s"result of f21:${r}" )

        // 2-2の処理を行うFuture。
        val f22 = future {
          def divide2( ev: Int, s: Int=0 ): Int = {
            ev match {
              case x if ( x < 2 ) => s
              case x => divide2( x/2, s+1 )
            }
          }
          divide2( r )
        }
        f22 onSuccess {
          case r => println( s"f22 divided number: ${r}" )
        }
        f22 onFailure {
          case e => println( s"f22 error: ${e.getMessage}" )
        }
      }
    }

    println( s"before await:${System.currentTimeMillis()-stime} ms" )

    val res = Await.result( f21, Duration.Inf )
    println( s"Await.result:${res}" )
  }

これを実行すると、f22のonSuccessやonFailure内のprint文が出力されません。

before await:244 ms
f21 start:236 ms
Await.result:1059380
onSuccess:2999 ms
result of f21:1059380

下記のブログに詳しい説明がありました。

Scala 2.10.0 Futures and Promises – 1Add Star
スタンスとしては、Futureを使う時は結果を同期的に待つのではなくて、Futureトレイトのインスタンスにコールバック関数を登録して(先のサンプルでは、Future#onComplete)、非同期に結果を受け取って処理しなさい、というスタンスなようです。
*その方が、性能的に好ましいということだそうで
とはいっても、Futureのバックグラウンドで使われているスレッドはForkJoinPoolのWorkerスレッド(Daemonスレッド)なので、普通にプログラムを実行してそれ以降の処理がないと、JavaVMが終了してしまいます…。

なるほど・・・
つまり、onComplete/onSuccess/onFailure内で生成したFutureは強制終了させられてしまうんですね^^;
まあonComplete/onSuccess/onFailure内でFutureを生成するのはネストが深くなって見た目にも美しくないので良いですが。

3. for内包表記によるコールバックチェイン。

1〜50までの数字の中から任意の5個を選ぶ。
5個の数値を足した合計が奇数か偶数か判定し、偶数の数を数える。(非同期で行う1番目のブロック)
最終的に算出された偶数の値を割り切れなくなるまで2で割り、割った回数を返却する。(非同期で行う2番目のブロック)
・・という処理を実装します。
  def callback2: Unit = {

    val stime = System.currentTimeMillis()

    // 3-1の処理を行うFuture。
    val f31 = future {
      println( s"f31 start:${System.currentTimeMillis()-stime} ms" )
      ( 1 to n ).combinations( r ).count( _.sum % 2 == 0 )
    }

    val f32 = for {
      f <- f31
    } yield {
      println( s"f32 yield:${System.currentTimeMillis()-stime} ms" )
      // 3-2の処理を行います。
      def divide2( ev: Int, s: Int=0 ): Int = {
        ev match {
          case x if ( x < 2 ) => s
          case x => divide2( x/2, s+1 )
        }
      }
      val d = divide2( f )
      println( s"f32 divided number: ${d}" )
      d
    }

    // 3-1が成功したら3-2の処理を行います。
    f32 onComplete  {
      case Success(r) =>
        println( s"f32 onComplete get: ${r}" )
      case Failure(e) => println( s"f32 error: ${e.getMessage}" )
    }

    println( s"before await:${System.currentTimeMillis()-stime} ms" )

    val res = Await.result( f32, Duration.Inf )
    println( s"Await.result:${res}" )
  }

for内包表記によるコールバックチェインは、一見するとf31の完了を待ち受けるロジックを書いていないように見えます。
しかし、f31が成功完了するとforコンビネータが新しいFutureを生成するのです。
つまり、f31が成功するとyieldブロックが新しいFutureとなります。
よって、このコードではf32は実行されるのです。

f31 start:242 ms
before await:247 ms
f32 yield:2974 ms
f32 divided number: 20
Await.result:20
f32 onComplete get: 20

4. 実用的な並列処理。

1〜50までの数字の中から任意の5個を選ぶ。
50までの数字の中から5個を選ぶ組み合せは2,118,760通りである。
この2,118,760通りの組み合せに対して、5個の数値を足した合計が奇数か偶数か判定し、偶数の数を数える。
2,118,760通りをプロセッサ数分のスレッドで偶数判定を行う。
最終的に算出された偶数の値を割り切れなくなるまで2で割り、割った回数を返却する。
・・という処理を実装します。
  def pragmatic: Unit = {

    val stime = System.currentTimeMillis()

    // 4-1. 1〜50までの数字の中から任意の5個を選びます。
    val size = combins( n, r )
    println( s"combination number:${size}, ${System.currentTimeMillis()-stime} ms" )

    // 1スレッドが処理する組数です。
    val block: Int = size / availableProcessors

    // 組み合せを生成します。
    val cs = ( 1 to n ).combinations( r ).toIndexedSeq
    println( s"combination generated:${System.currentTimeMillis()-stime} ms" )

    val newcs = for ( i <- 1 to availableProcessors ) yield {
      //cs.drop( (i-1) * block ).take( block )
      cs.slice( (i-1) * block, i * block )
    }
    // 4-2. スレッドを生成します。
    val futures = for ( i <- 1 to availableProcessors )
      yield future {
      newcs(i-1).count( _.sum % 2 == 0 )
    }

    println( s"before await:${System.currentTimeMillis()-stime} ms" )

    // 4-2.の結果を待つ受けます。
    val f42 = Await.result( Future.sequence( futures ), Duration.Inf )
    println( s"f42 complete:${System.currentTimeMillis()-stime} ms" )

    // 4-2が成功したら4-3の処理を行います。
    def divide2( ev: Int, s: Int=0 ): Int = {
      ev match {
        case x if ( x < 2 ) => s
        case x => divide2( x/2, s+1 )
      }
    }
    val d = divide2( f42.sum )
    println( s"f43 divided number: ${d}" )
    println( s"f43 complete:${System.currentTimeMillis()-stime} ms" )
  }

  def combins( n: Int, r: Int ): Int = {
    r match {
      case 0 => 1
      case 1 => n
      case x if ( x == n ) => 1
      case _ => combins( n-1, r-1 ) + combins( n-1, r )
    }
  }

200万個のデータですからねー
プロセッサ毎に非同期処理させたら速いだろうと思ったんですが、それ以上にStreamをIndexedSeqへ変換するコストがかかりすぎて・・・
IndexedSeqへの変換後は速いんですけどね。

combination number:2118760, 166 ms
combination generated:5483 ms
before await:5521 ms
f42 complete:5786 ms
f43 divided number: 20
f43 complete:5786 ms

※ここに掲載したソースコードは下記にアップしてます。
https://github.com/x1-/scala_async_2.10.3

次回は、非同期処理時のobject, lazy valの挙動について書きます。