2013年5月30日木曜日

PlayFramework2.1.1で動かすakka ActorとWebSocketのサンプル

scala、PlayFramework、akka、WebSocketと初めてづくしで、自分的に、はまりポイント満載だったので、サンプルをあげておきます。
akkaとWebSocketが両方入っていて、サンプルとしては焦点が絞れてないけれど、二つ記事を書くのが面倒なので、そのまんまです。

環境は、PlayFramework2.1.1でscalaもakkaもplayに付属のもの。

まず、初心者がscala、PlayFramework、akkaで一番はまるポイントは、これらは、現在バリバリ進化中の技術なので、ググった情報が古くなってしまっていることが多い、ということ。たった半年前の記事でさえ、古いこともあるので注意が必要です。もちろん、この記事もすぐに古くなりそう。

コードは、PlayのアプリケーションとActorを書いたやつの2ファイル。

・ Playのアプリケーション側
package controllers

import play.api._
import play.api.mvc._
import models._
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.concurrent.Akka
import akka.util.Timeout
import akka.actor._
import akka.pattern.ask  // ? の使用に必要!!
import scala.concurrent._
import scala.concurrent.duration._  // 5 seconds の表現に必要
import ExecutionContext.Implicits.global // FutureのonSucceessを使うのに便利
import play.api.libs.iteratee._
import play.api.Play.current // Akka.system を使うのに必要


object AppHelloActor extends Controller {
 val actor = Akka.system.actorOf(Props[HelloActor], name = "helloactor")

 def index = Action {
  Ok(views.html.test("test page."))
 }
 
 def actorWebSocketsSample() = WebSocket.using[String] {
 request =>
 
  def onStart: Channel[String] => Unit = {
     channel => // このchannelを取っておくとclientにいろいろ送信できる
   println("actorWebSocketsSample unicast onStart")
   implicit val timeout = Timeout(5 seconds) // この下の?の隠れ引数のためにここで宣言?! こういう言語は初めてだぜ。
   val future = actor ? "world"
   //val future = actor ? 3.14159 // 上の行をコメントにして、こっちを生かすとonFailureでtimeoutを取れる
   //val future = actor.?("world")(Timeout(5 seconds))
   future.onSuccess {
   case res: Array[String] =>
    res.foreach(value => println("RESULT = " + value))
    channel.push(res.mkString(" ")) // clientにメッセージ送信
    channel.eofAndEnd()
   }
   future.onFailure {
   case e =>
    println("future.onFailure - " + e.getMessage())
    channel.eofAndEnd()
   }
  }

  def onError: (String, Input[String]) => Unit = {
     (message, input) =>
   println("actorWebSocketsSample unicast onError " + message)
  }
  def onComplete = println("actorWebSocketsSample unicast onComplete")
  
  val enumerator = Concurrent.unicast[String](onStart, onComplete, onError)
  
  val iteratee = Iteratee.foreach[String] {
  rcvData =>
   // 今回は、上のonStartで処理終了しちゃってるから何もしてないけど、
   // ここでWebSocketクライアントから受信したデータを処理する。
  } mapDone {_ => println("Disconnected")}
  
  (iteratee, enumerator)
 }
 
}
・ 次は、Actor側
package models

import akka.actor.Actor

class HelloActor() extends Actor {

 def receive = {
  case s: String =>
   sender ! Array(s"Hello, $s.", "Bye Bye.")
     case _ =>
      // timeoutさせたいので、何もしない
 }

}
以上でコードはおしまい。以下、解説、はまったポイント等をだらだらと・・・・

上のコード、importがだらだらとあって省略していないけど、実はここが今回の最重要ポイント。
「webのサンプルと環境が同じはずなのに、コンパイル出来ねぇぜ。大体、こんなメソッドはドキュメントにも載ってないぞ」
とかいうscalaでありがちな状況は、importが足らないのが原因ということが多い。

akkaのActorを使う上で、最初参考にしたのが、
Scalaで並行処理#2 – AkkaのActorを使う
のページで、とてもわかりやすいサンプルと解説の素晴らしい記事。
で、!(投げっぱなし) !!(同期) !!!(非同期)を駆使してやるぞと意気込んでみたものの、!!と!!!のメソッドが見当たらない。
おまけにActor側のself.replyも見当たらない。どうやら仕様変更があった模様。
{まじすか、勘弁してくださいよ先生・・・}
とめげかけるが、akkaの公式ページに解答を発見。(Use With Actorsの項)
どうやら、Actorから、もしくはActorへの通信は、!(返事いらない) と?(返事欲しい)の2種類になったっぽい。たしかにこの表記の方がシンプルでわかりやすい。
前の!!のように、同期で返事を待ちたい場合は、?でもらえるFutureを使って、
val result = Await.result(future, timeout.duration).asInstanceOf[String]
みたいにすれば、結果来るまで待ってられるし、前の!!!のように非同期で結果待ちの時は上のサンプルのように、onSuccessなりonCompleteなり使えばいい。
Futureの使い方についてとても参考になったのは、
Starlight -Little Programmer’s Diary-  Scala 2.10.0 Futures and Promises
のページです。サンプル多くてわかりやすいです。

あと、Actor側のself.replyもsender !になった。
{でも、!ってActorへのメッセージじゃなかったっけ?}
と思ったが、なんと?メソッドが、Actorにメッセージを投げる時に内部で一時的にActorを作ってくれるから、Actorからの戻しも!でいいとのこと。すげー。シンプルだ。
この、scalaとakka開発陣の仕様変えるな、こら」という声に負けない攻めの姿勢は大好きです。
scalaのことだから、前の書き方でも、たぶん何かimportするだけで動くんだと思う(知らないけど)、これができるのも仕様変更に踏み切れる理由なんだろう。implicit conversion恐るべし。

ただ、上の?メソッドも、ActorRefにも、ScalaActorRefにも見当たらない、これは、コード10行目の import akka.pattern.ask すると使えるようになる。
このimportで、ActorRefがAskableActorRefに、implicit conversionで変身するので?が使用可能になる。これは、確かにわかりにくいが、scalaの拡張性やコードの簡潔さを支えているのが、良くも悪くもimplicit conversionだということがよくわかった。

あと、はまったのが、?メソッドに必要な、隠し変数timeout。scalaには、implicit parameterといって、メソッドに必要な引数を普通に引数として渡すのではなく、周りのコードの引数と同じ型のimplicit宣言された値から取ってくるという、ぶっ飛んだ機能があって、こいつを初めて見たので何のことやらさっぱりわからんかった。
確かにこれなら、せっかく?メソッドが演算子的にかっこ良く書けるのに、timeoutをどう渡すんだよという難問を解決できる。
明示的にtimeoutを渡したい時は、
val  future = actor.?("world")(Timeout(5 seconds))
みたいにもできるが、見にくいし、かっこ悪い。
implicit parameterについては、ひしだまさんのページに書かれた一連のサンプルが非常にわかりやすいです。そのページに限らずひしだまさんのページは、考え抜かれた短いサンプルが大量にあって、scalaを使うには欠かすことができません。scalaの神様です。

で、疲れてきたんだけど、WebSocketにまだ触れてないなこれ。
参考にしたページは、
Auto-Saving ... Done  Play Framework 2.1のWebSocketで一対一通信

server側からclientに送信するには、onStartが呼ばれる時にやってくるchannelを使うので、WebSocketがつながっている間は、こいつを大事に取っておきましょう。
client側から何かを受信するたびに、Iteratee.foreach の中に入ってくるので、ここで簡単に受信データを処理できます。

scalaをいじっていて感じたのは、scala開発陣の、いかに簡潔なコードを作成可能な言語にするかという執念じみたものでした。その執念に、これからも期待したいです。

2013年5月24日金曜日

PlayFramework2.1.1環境で、ScalaからJNA経由でcを呼んだりコールバックセットしたりした

PlayFrameworkを使ったシステムをscalaで書いていて、cを呼びたくなった。
呼びたくなったというよりは、裏でc++でものすっごく激しい計算させるので呼ばざるを得ない。
google先生に聞いても、俺の検索力ではサンプルっぽいものは見つけられかった。
{くぅ。scalaも始めたばっかりで分けわからんし、どうしてくれようか・・・}
とコードをごねごねしていたらなんか動いたので、忘れる前に投稿してみた。

できてしまえば、かなりシンプルになった。もちろん一見シンプルに見えるのはscalaパワーのおかげ。
やりたかったことは、あるクラス(HelloData)の各インスタンスのもつクロージャ(?)をc側にコールバックとして登録して、それぞれのインスタンスからcのライブラリを呼んだ時に、ちゃんとコールバックが呼び出したインスタンスのクロージャに返ってくるようにするということ。

ソースのファイルは5つ
  • Application.scala Play側のmain的な入り口
  • HelloData.scala テキトーなクラス。こいつのインスタンスからcを呼び出したり、ここのコールバックに戻したり、いろんなデータを持ってみたりしてみたい。
  • HelloJna.scala JNAとの接合部分
  • index.scala.html web出力のview部分
  • JnaInterface.cpp c側のソース
以降の部分、ほとんどはコードですが、流れとしては、
Playからindexが呼ばれる→HelloDataのインスタンス作成→HelloDataのコンストラクタ内でコールバック登録→HelloDataのメソッドからcの関数呼び出し→c側からコールバック呼び出し→コールバック内でprintln→c側関数が文字列を戻して終了→webにメッセージ出しておしまい

じゃ、Application.scalaから
HelloDataのインスタンスを3つ作って、ちゃんと区別されて返ってくるか実験する。
package controllers

import play.api._
import play.api.mvc._
import models._

object Application extends Controller {
 def index = Action {
  val data = List(new HelloData(), new HelloData(), new HelloData())
  val msg = data.map(_.callJnaHello)
  
  Ok(views.html.index(msg))
 }
}

HelloData.scala
scala側とc側でデータの連携をとるために、ユニークなidxを持たせてある。
2013/6/4修正 callbackをvalに入れるようにしました。詳しくはこちらの投稿
package models

class HelloData() {
 private val lib = HelloJna.lib
 private val idx = HelloData.cnt;
 private val callback = new JnaCallback {
  def onMessage(msg: String) = println("onMessage " + idx + ": " + msg)
 }
 HelloData.cnt += 1
 
 lib.setFunction(idx, callback)
 
 def callJnaHello: String = lib.getHello(idx)
}

object HelloData {
 var cnt = 0;
}

HelloJna.scala
ライブラリファイルを変更してコンパイルし直しても、一度JVMを再起動(playを再起動すればいい)しないとライブラリをloadし直さないので注意(ここはまった)。JVMには、一度loadしたNativeのLibraryをunloadする機能はないらしい。←あったら誰か教えて
なお、途中のlibHelloJNA.soは、もっと下のJnaInterface.cppからつくったSharedObjectです。
package models

import com.sun.jna.Library
import com.sun.jna.Native
import com.sun.jna.Callback

object HelloJna {
 val lib = Native.loadLibrary("/xxx/yyy/xxx/Debug/libHelloJNA.so", classOf[HelloJnaTrait]).asInstanceOf[HelloJnaTrait]
}

trait JnaCallback extends Callback {
 def onMessage(msg: String): Unit
}

trait HelloJnaTrait extends Library {
 def getHello(idx: Int): String
 def setFunction(idx: Int, callback: JnaCallback)
}

index.scala.html
cの関数から戻ってきた文字列を出すだけ
@(messages: List[String])

<!DOCTYPE html>
<html>
  <body>
 @for(msg <- data-blogger-escaped-messages="" data-blogger-escaped-p="">
 @msg 
} </body> </html>
JnaInterface.cpp
実験なのでidxが255超えたらどうすんのって突っ込みはなし
他の部分はc++で書いていてもここのところはcの関数的に宣言する必要あり(たぶん)。
あと、一番下にiostream閉じてる変なタグが見えたら、SystaxHilighterのバグなので気にしないこと。
#include 
using namespace std;

static int g_cnt = 0;
static char g_msg[256];
static void (*g_callback[256])(const char *msg);

extern "C" const char *getHello(int idx) {
 char callbackMsg[256];

 sprintf(callbackMsg, "msg to callback %d", idx);
 g_callback[idx](callbackMsg);

 sprintf(g_msg, "Hello JNA idx = %d, cnt = %d.", idx, ++g_cnt);
 return g_msg;
}

extern "C" void setFunction(int idx, void (*func)(const char *msg)) {
 g_callback[idx] = func;
}

下は、ブラウザに表示された結果
Hello JNA idx = 0, cnt = 1.

Hello JNA idx = 1, cnt = 2.

Hello JNA idx = 2, cnt = 3.

次は、printlnの出力
コロン前の数字の値は、各HelloDataインスタンス内にあるので、各インスタンス内のクロージャにちゃんとコールバックが戻っているのが確認できる。
onMessage 0: msg to callback 0
onMessage 1: msg to callback 1
onMessage 2: msg to callback 2