2013年9月12日木曜日

scalaで調子に乗ってfuture{でスレッド量産してたらはまった

注・この記事は、scala2.10を使用しています。

scalaで、
import ExecutionContext.Implicits.global
とfuture使ってて、
「あれ? futureで作った別スレッドの処理がなんか始まらないことがあるんだけど・・・」
ということがある人のための記事です。

scala使ってて便利なのにスレッドを超簡単に作れるってのがあるんだけど、
import ExecutionContext.Implicits.global // どこか上の方でimport

future {
・ なんかいろいろ
・ なんかいろいろ
}
こんな感じね。

ただ単に、future{やると、エラーが出るんで、いろんなサイトのサンプル(こことかこことかここ。みんな良サイト) を参考に、先にimportしてscalaが持ってるデフォルトのExecutionContextであるExecutionContext.Implicits.global を利用するようにしてた。

ところが、調子に乗ってメインのスレッドでやりたくないことをfuture{でがんがん切り離して処理したんだけど、そのプログラムをコア数1のマシンで動かそうとしたら動かない。コア数2のマシンで動かしたらなんか動くけど、プログラムがよく固まる。

固まった状態ではほとんどCPUは食っていない。
ちなみに自分の開発マシンは、4core&ハイパースレッドで8スレッドだったのでその現象は出なかった。
{うげー、マルチスレッド周りの問題は追いかけにくくて嫌だなー・・・}
と思ったのだが、printlnとか入れてみるとどうやら処理がfutureの中身に来ていないようだ。

仕方ないのでサンプルを書いて自機(8スレマシン)で実験してみた。
import scala.concurrent._
import ExecutionContext.Implicits.global

object ThreadSample {
  def main(args: Array[String]) {
    val startTime = System.currentTimeMillis()

    for (idx <- 1 to 24) {
     future {
      println(f"idx = $idx%2d, time = ${System.currentTimeMillis() - startTime}%4d ms")
      Thread.sleep(1000)
     }
    }
    Thread.sleep(3000)
    println(s"end of main.     ${System.currentTimeMillis() - startTime} ms")
  }
}
実行結果
idx =  1, time =   73 ms
idx =  3, time =   73 ms
idx =  8, time =   73 ms
idx =  2, time =   73 ms
idx =  7, time =   73 ms
idx =  4, time =   73 ms
idx =  5, time =   73 ms
idx =  6, time =   73 ms
idx = 13, time = 1099 ms
idx = 16, time = 1099 ms
idx = 15, time = 1099 ms
idx = 10, time = 1099 ms
idx =  9, time = 1099 ms
idx = 14, time = 1099 ms
idx = 11, time = 1099 ms
idx = 12, time = 1099 ms
idx = 21, time = 2100 ms
idx = 23, time = 2100 ms
idx = 20, time = 2100 ms
idx = 19, time = 2100 ms
idx = 18, time = 2100 ms
idx = 17, time = 2100 ms
idx = 22, time = 2100 ms
idx = 24, time = 2100 ms
end of main.     3074 ms
{なんじゃこりゃ、8個ずつしか動いてないじゃん。}
sleepしたからって、次のthreadを処理してくれる訳じゃなく御丁寧にsleepが終わるまで次のスレッドを処理しないで待ってるではないか!
で、どうやらこの同時に動けるfutureの数がマシンのコア数によって変るので、コア数の少ないマシンで自分のプログラムを動かしてた時にfutureで切り離したthreadが実行されなかった模様。

何か他のやり方ないかな、と探していたら、spawnというものがあるようなのでこんなふうに変えてみたら
import scala.concurrent._
import scala.concurrent.ops.spawn

object ThreadSample {
  def main(args: Array[String]) {
    val startTime = System.currentTimeMillis()

 for (idx <- 1 to 24) {
     spawn {
      println(f"idx = $idx%2d, time = ${System.currentTimeMillis() - startTime}%4d ms")
      Thread.sleep(1000)
     }
    }
    Thread.sleep(3000)
    println(s"end of main.     ${System.currentTimeMillis() - startTime} ms")
  }
}
実行結果
idx = 16, time =   56 ms
idx = 23, time =   57 ms
idx =  7, time =   55 ms
idx =  8, time =   55 ms
idx = 21, time =   56 ms
idx = 12, time =   56 ms
idx = 14, time =   56 ms
idx =  5, time =   55 ms
idx = 15, time =   56 ms
idx = 20, time =   56 ms
idx = 19, time =   56 ms
idx = 11, time =   56 ms
idx =  3, time =   55 ms
idx = 13, time =   56 ms
idx =  2, time =   55 ms
idx = 10, time =   55 ms
idx =  1, time =   55 ms
idx = 17, time =   56 ms
idx =  4, time =   55 ms
idx = 18, time =   56 ms
idx = 24, time =   57 ms
idx =  9, time =   55 ms
idx = 22, time =   57 ms
idx =  6, time =   55 ms
end of main.     3058 ms
欲しかった結果になった。しかしコンパイル時に、こんな警告が出た。
[warn] /Users/xxx.scala:97: object ops in package concurrent is deprecated: Use `Future` instead.
[warn]   spawn {
[warn]   ^
{なにー、future使えだとー!?}
仕方ないのでもう少し調べてみると、ちゃんと理解せずに使っていた、
import ExecutionContext.Implicits.global
が元のスレッドの動作を制御していたようで、こいつを変更する必要があった。 しかし、ここで活躍するのが何とjavaのスレッド管理機能であるExecutorsってやつだった。(わかりやすい解説)
で、変更したコードがこれ
import scala.concurrent._
import java.util.concurrent.Executors

object ThreadSample {
  def main(args: Array[String]) {
    val startTime = System.currentTimeMillis()

    val pool = Executors.newCachedThreadPool()
    implicit val execctx = ExecutionContext.fromExecutorService(pool)
 for (idx <- 1 to 24) {
     future {
      println(f"idx = $idx%2d, time = ${System.currentTimeMillis() - startTime}%4d ms")
      Thread.sleep(1000)
     }
    }
    Thread.sleep(3000)
    println(s"end of main.     ${System.currentTimeMillis() - startTime} ms")
  }
}
実行結果
idx = 18, time =   60 ms
idx =  6, time =   59 ms
idx =  5, time =   59 ms
idx =  2, time =   59 ms
idx = 10, time =   59 ms
idx = 13, time =   60 ms
idx = 11, time =   60 ms
idx =  7, time =   59 ms
idx =  8, time =   59 ms
idx =  9, time =   59 ms
idx =  4, time =   59 ms
idx =  3, time =   59 ms
idx = 17, time =   60 ms
idx = 19, time =   60 ms
idx = 15, time =   60 ms
idx = 21, time =   60 ms
idx = 22, time =   61 ms
idx = 12, time =   60 ms
idx = 16, time =   60 ms
idx = 24, time =   61 ms
idx = 23, time =   61 ms
idx = 20, time =   60 ms
idx =  1, time =   59 ms
idx = 14, time =   60 ms
end of main.     3062 ms
で、うまくいった。
Executor辺りのことは、上で紹介したサイトにも書いてあったのだが、そこを読んだときは、ExecutionContext.Implicits.globalで動くんだからいいじゃん。とちゃんと理解してなかった。

ちなみにPlayFrameworkとかいじっててakkaが使える状態のときは、javaのExecutorsを呼ばずに
 val system = ActorSystem("xxxxx")
 import system.dispatcher

future {
・ なんかいろいろ
・ なんかいろいろ
}
で、ActorSystemのdispatcherをインポートしてあげれば、ExecutionContext.Implicits.globalを使わないでいい感じになった。