2014年4月11日金曜日

[scala][akka] microkernelを使って独立したプロセスに立てたRemote Actorとの通信サンプル

自分は、akkaの「Let it crash」というという発想が大好きなんです。
初めて「Let it crash」というスローガンを見たとき
{だよね。それでいいんだよね。}
と感激しました。

ただ、crashさせるためのSupervisor(docサンプル)の仕組みは、Actorはcrashさせるけど、jvmまでは殺してくれない。自分の場合、ヘビーな計算をscalaからloadLibraryしたc++のモジュールをJNA経由で使ってやっているので、jvmごとcrashさせたいんです。(外部libraryのunloadはできないようなので・・・)

そこで、登場するのがakkaのmicrokernelで、こいつを使うと簡単に独立したプロセスのjvmでActorを立てることが出来る。
必要に時は、こいつをjvmごとkillして復活させればいい ← もっとスマートな仕組みがきっとある気がする

で、HelloWorldのサンプルを書いてみた。
listenポートを可変にしたかったので、confファイルを使わない形で作ってみた。
application.confとかに設定が分かれてないほうがサンプルとしてもわかりやすいし。
まず、リモートで接続される側のmicrokernel
パラメータは、起動時に引数として渡せないので、起動する時は、
env AKKA_PORT=12345 akka hello.world.Sample.HelloLauncher
て感じで、環境変数経由にする。
あと、実行時にクラスが見つかんないみたいに怒られた時は、とりあえずjarに固めて、akkaのdeployフォルダに置けば見つかるようになります。
package hello.world.Sample

import akka.actor.{ Actor, ActorSystem, Props }
import akka.kernel.Bootable

class HelloLauncher extends Bootable {
    val port = System.getenv("AKKA_PORT")

    val conf = ConfigFactory.parseString(s"""
      akka.actor.provider = akka.remote.RemoteActorRefProvider
      akka.remote.netty.tcp.hostname = 127.0.0.1
      akka.remote.netty.tcp.port = ${port}
    """.stripMargin).withFallback(ConfigFactory.load());    
    val system = ActorSystem("HelloSystem", conf)

    def startup = {
        system.actorOf(Props[Hello], "hello")
    }

    def shutdown = {
        system.shutdown()
    }  
}

class Hello extends Actor {
  def receive = {
    case msg: String =>
      println("Hello: msg = " + msg)
  }
}
つぎに、Helloにメッセージを投げる側のサンプル
env AKKA_PORT=12345 akka hello.world.Sample.HelloLauncher
env AKKA_PORT=23456 akka hello.world.Sample.HelloLauncher
のように、別コンソールで2つのmicrokernelを起動した前提で動作させる。
object HelloCaller {

import akka.actor.{ Actor, ActorSystem, Props }
    
  def main(args: Array[String]) {
    System.setProperty("akka.actor.provider", "akka.remote.RemoteActorRefProvider")
    // このポートはリモート側でsenderにメッセージを戻す時に使用する
    System.setProperty("akka.remote.netty.tcp.port", "10000")
    val system = ActorSystem("CallerSystem")
    val actor1 = system.actorSelection("akka.tcp://HelloSystem@127.0.0.1:12345/user/hello")
    val actor2 = system.actorSelection("akka.tcp://HelloSystem@127.0.0.1:23456/user/hello")
    actor1 ! "Hello,"
    actor2 ! "Hello,"
    Thread.sleep(1000)
    actor1 ! "World."
    actor2 ! "Another actor."
    Thread.sleep(1000)
    actor1 ! "Yeaaahhh!!!"
    actor2 ! "Hooooooo!!!"
    Thread.sleep(1000)
    system.shutdown
  }

}
なお、mainから呼ぶ側のlibraryDependenciesには、akka-kernel, akka-actor, akka-remote を追加しています。