scalaで、
import ExecutionContext.Implicits.global
とfuture使ってて、
「あれ? futureで作った別スレッドの処理がなんか始まらないことがあるんだけど・・・」
ということがある人のための記事です。
scala使ってて便利なのにスレッドを超簡単に作れるってのがあるんだけど、
1 2 3 4 5 6 | import ExecutionContext.Implicits.global // どこか上の方でimport future { ・ なんかいろいろ ・ なんかいろいろ } |
ただ単に、future{やると、エラーが出るんで、いろんなサイトのサンプル(こことかこことかここ。みんな良サイト) を参考に、先にimportしてscalaが持ってるデフォルトのExecutionContextであるExecutionContext.Implicits.global を利用するようにしてた。
ところが、調子に乗ってメインのスレッドでやりたくないことをfuture{でがんがん切り離して処理したんだけど、そのプログラムをコア数1のマシンで動かそうとしたら動かない。コア数2のマシンで動かしたらなんか動くけど、プログラムがよく固まる。
固まった状態ではほとんどCPUは食っていない。
ちなみに自分の開発マシンは、4core&ハイパースレッドで8スレッドだったのでその現象は出なかった。
{うげー、マルチスレッド周りの問題は追いかけにくくて嫌だなー・・・}
と思ったのだが、printlnとか入れてみるとどうやら処理がfutureの中身に来ていないようだ。
仕方ないのでサンプルを書いて自機(8スレマシン)で実験してみた。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | import scala.concurrent. _ import ExecutionContext.Implicits.global object ThreadSample { def main(args : Array[String]) { val startTime = System.currentTimeMillis() for (idx <- 1 to 24 ) { future { println(f "idx = $idx%2d, time = ${System.currentTimeMillis() - startTime}%4d ms" ) Thread.sleep( 1000 ) } } Thread.sleep( 3000 ) println(s "end of main. ${System.currentTimeMillis() - startTime} ms" ) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | idx = 1 , time = 73 ms idx = 3 , time = 73 ms idx = 8 , time = 73 ms idx = 2 , time = 73 ms idx = 7 , time = 73 ms idx = 4 , time = 73 ms idx = 5 , time = 73 ms idx = 6 , time = 73 ms idx = 13 , time = 1099 ms idx = 16 , time = 1099 ms idx = 15 , time = 1099 ms idx = 10 , time = 1099 ms idx = 9 , time = 1099 ms idx = 14 , time = 1099 ms idx = 11 , time = 1099 ms idx = 12 , time = 1099 ms idx = 21 , time = 2100 ms idx = 23 , time = 2100 ms idx = 20 , time = 2100 ms idx = 19 , time = 2100 ms idx = 18 , time = 2100 ms idx = 17 , time = 2100 ms idx = 22 , time = 2100 ms idx = 24 , time = 2100 ms end of main. 3074 ms |
sleepしたからって、次のthreadを処理してくれる訳じゃなく御丁寧にsleepが終わるまで次のスレッドを処理しないで待ってるではないか!
で、どうやらこの同時に動けるfutureの数がマシンのコア数によって変るので、コア数の少ないマシンで自分のプログラムを動かしてた時にfutureで切り離したthreadが実行されなかった模様。
何か他のやり方ないかな、と探していたら、spawnというものがあるようなのでこんなふうに変えてみたら
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | import scala.concurrent. _ import scala.concurrent.ops.spawn object ThreadSample { def main(args : Array[String]) { val startTime = System.currentTimeMillis() for (idx <- 1 to 24 ) { spawn { println(f "idx = $idx%2d, time = ${System.currentTimeMillis() - startTime}%4d ms" ) Thread.sleep( 1000 ) } } Thread.sleep( 3000 ) println(s "end of main. ${System.currentTimeMillis() - startTime} ms" ) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | idx = 16 , time = 56 ms idx = 23 , time = 57 ms idx = 7 , time = 55 ms idx = 8 , time = 55 ms idx = 21 , time = 56 ms idx = 12 , time = 56 ms idx = 14 , time = 56 ms idx = 5 , time = 55 ms idx = 15 , time = 56 ms idx = 20 , time = 56 ms idx = 19 , time = 56 ms idx = 11 , time = 56 ms idx = 3 , time = 55 ms idx = 13 , time = 56 ms idx = 2 , time = 55 ms idx = 10 , time = 55 ms idx = 1 , time = 55 ms idx = 17 , time = 56 ms idx = 4 , time = 55 ms idx = 18 , time = 56 ms idx = 24 , time = 57 ms idx = 9 , time = 55 ms idx = 22 , time = 57 ms idx = 6 , time = 55 ms end of main. 3058 ms |
1 2 3 | [warn] /Users/xxx.scala : 97 : object ops in package concurrent is deprecated : Use `Future` instead. [warn] spawn { [warn] ^ |
仕方ないのでもう少し調べてみると、ちゃんと理解せずに使っていた、
import ExecutionContext.Implicits.global
が元のスレッドの動作を制御していたようで、こいつを変更する必要があった。 しかし、ここで活躍するのが何とjavaのスレッド管理機能であるExecutorsってやつだった。(わかりやすい解説)
で、変更したコードがこれ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import scala.concurrent. _ import java.util.concurrent.Executors object ThreadSample { def main(args : Array[String]) { val startTime = System.currentTimeMillis() val pool = Executors.newCachedThreadPool() implicit val execctx = ExecutionContext.fromExecutorService(pool) for (idx <- 1 to 24 ) { future { println(f "idx = $idx%2d, time = ${System.currentTimeMillis() - startTime}%4d ms" ) Thread.sleep( 1000 ) } } Thread.sleep( 3000 ) println(s "end of main. ${System.currentTimeMillis() - startTime} ms" ) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | idx = 18 , time = 60 ms idx = 6 , time = 59 ms idx = 5 , time = 59 ms idx = 2 , time = 59 ms idx = 10 , time = 59 ms idx = 13 , time = 60 ms idx = 11 , time = 60 ms idx = 7 , time = 59 ms idx = 8 , time = 59 ms idx = 9 , time = 59 ms idx = 4 , time = 59 ms idx = 3 , time = 59 ms idx = 17 , time = 60 ms idx = 19 , time = 60 ms idx = 15 , time = 60 ms idx = 21 , time = 60 ms idx = 22 , time = 61 ms idx = 12 , time = 60 ms idx = 16 , time = 60 ms idx = 24 , time = 61 ms idx = 23 , time = 61 ms idx = 20 , time = 60 ms idx = 1 , time = 59 ms idx = 14 , time = 60 ms end of main. 3062 ms |
Executor辺りのことは、上で紹介したサイトにも書いてあったのだが、そこを読んだときは、ExecutionContext.Implicits.globalで動くんだからいいじゃん。とちゃんと理解してなかった。
ちなみにPlayFrameworkとかいじっててakkaが使える状態のときは、javaのExecutorsを呼ばずに
1 2 3 4 5 6 7 | val system = ActorSystem( "xxxxx" ) import system.dispatcher future { ・ なんかいろいろ ・ なんかいろいろ } |