2013年5月30日木曜日

PlayFramework2.1.1で動かすakka ActorとWebSocketのサンプル

scala、PlayFramework、akka、WebSocketと初めてづくしで、自分的に、はまりポイント満載だったので、サンプルをあげておきます。
akkaとWebSocketが両方入っていて、サンプルとしては焦点が絞れてないけれど、二つ記事を書くのが面倒なので、そのまんまです。

環境は、PlayFramework2.1.1でscalaもakkaもplayに付属のもの。

まず、初心者がscala、PlayFramework、akkaで一番はまるポイントは、これらは、現在バリバリ進化中の技術なので、ググった情報が古くなってしまっていることが多い、ということ。たった半年前の記事でさえ、古いこともあるので注意が必要です。もちろん、この記事もすぐに古くなりそう。

コードは、PlayのアプリケーションとActorを書いたやつの2ファイル。

・ Playのアプリケーション側
package controllers

import play.api._
import play.api.mvc._
import models._
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.concurrent.Akka
import akka.util.Timeout
import akka.actor._
import akka.pattern.ask  // ? の使用に必要!!
import scala.concurrent._
import scala.concurrent.duration._  // 5 seconds の表現に必要
import ExecutionContext.Implicits.global // FutureのonSucceessを使うのに便利
import play.api.libs.iteratee._
import play.api.Play.current // Akka.system を使うのに必要


object AppHelloActor extends Controller {
 val actor = Akka.system.actorOf(Props[HelloActor], name = "helloactor")

 def index = Action {
  Ok(views.html.test("test page."))
 }
 
 def actorWebSocketsSample() = WebSocket.using[String] {
 request =>
 
  def onStart: Channel[String] => Unit = {
     channel => // このchannelを取っておくとclientにいろいろ送信できる
   println("actorWebSocketsSample unicast onStart")
   implicit val timeout = Timeout(5 seconds) // この下の?の隠れ引数のためにここで宣言?! こういう言語は初めてだぜ。
   val future = actor ? "world"
   //val future = actor ? 3.14159 // 上の行をコメントにして、こっちを生かすとonFailureでtimeoutを取れる
   //val future = actor.?("world")(Timeout(5 seconds))
   future.onSuccess {
   case res: Array[String] =>
    res.foreach(value => println("RESULT = " + value))
    channel.push(res.mkString(" ")) // clientにメッセージ送信
    channel.eofAndEnd()
   }
   future.onFailure {
   case e =>
    println("future.onFailure - " + e.getMessage())
    channel.eofAndEnd()
   }
  }

  def onError: (String, Input[String]) => Unit = {
     (message, input) =>
   println("actorWebSocketsSample unicast onError " + message)
  }
  def onComplete = println("actorWebSocketsSample unicast onComplete")
  
  val enumerator = Concurrent.unicast[String](onStart, onComplete, onError)
  
  val iteratee = Iteratee.foreach[String] {
  rcvData =>
   // 今回は、上のonStartで処理終了しちゃってるから何もしてないけど、
   // ここでWebSocketクライアントから受信したデータを処理する。
  } mapDone {_ => println("Disconnected")}
  
  (iteratee, enumerator)
 }
 
}
・ 次は、Actor側
package models

import akka.actor.Actor

class HelloActor() extends Actor {

 def receive = {
  case s: String =>
   sender ! Array(s"Hello, $s.", "Bye Bye.")
     case _ =>
      // timeoutさせたいので、何もしない
 }

}
以上でコードはおしまい。以下、解説、はまったポイント等をだらだらと・・・・

上のコード、importがだらだらとあって省略していないけど、実はここが今回の最重要ポイント。
「webのサンプルと環境が同じはずなのに、コンパイル出来ねぇぜ。大体、こんなメソッドはドキュメントにも載ってないぞ」
とかいうscalaでありがちな状況は、importが足らないのが原因ということが多い。

akkaのActorを使う上で、最初参考にしたのが、
Scalaで並行処理#2 – AkkaのActorを使う
のページで、とてもわかりやすいサンプルと解説の素晴らしい記事。
で、!(投げっぱなし) !!(同期) !!!(非同期)を駆使してやるぞと意気込んでみたものの、!!と!!!のメソッドが見当たらない。
おまけにActor側のself.replyも見当たらない。どうやら仕様変更があった模様。
{まじすか、勘弁してくださいよ先生・・・}
とめげかけるが、akkaの公式ページに解答を発見。(Use With Actorsの項)
どうやら、Actorから、もしくはActorへの通信は、!(返事いらない) と?(返事欲しい)の2種類になったっぽい。たしかにこの表記の方がシンプルでわかりやすい。
前の!!のように、同期で返事を待ちたい場合は、?でもらえるFutureを使って、
val result = Await.result(future, timeout.duration).asInstanceOf[String]
みたいにすれば、結果来るまで待ってられるし、前の!!!のように非同期で結果待ちの時は上のサンプルのように、onSuccessなりonCompleteなり使えばいい。
Futureの使い方についてとても参考になったのは、
Starlight -Little Programmer’s Diary-  Scala 2.10.0 Futures and Promises
のページです。サンプル多くてわかりやすいです。

あと、Actor側のself.replyもsender !になった。
{でも、!ってActorへのメッセージじゃなかったっけ?}
と思ったが、なんと?メソッドが、Actorにメッセージを投げる時に内部で一時的にActorを作ってくれるから、Actorからの戻しも!でいいとのこと。すげー。シンプルだ。
この、scalaとakka開発陣の仕様変えるな、こら」という声に負けない攻めの姿勢は大好きです。
scalaのことだから、前の書き方でも、たぶん何かimportするだけで動くんだと思う(知らないけど)、これができるのも仕様変更に踏み切れる理由なんだろう。implicit conversion恐るべし。

ただ、上の?メソッドも、ActorRefにも、ScalaActorRefにも見当たらない、これは、コード10行目の import akka.pattern.ask すると使えるようになる。
このimportで、ActorRefがAskableActorRefに、implicit conversionで変身するので?が使用可能になる。これは、確かにわかりにくいが、scalaの拡張性やコードの簡潔さを支えているのが、良くも悪くもimplicit conversionだということがよくわかった。

あと、はまったのが、?メソッドに必要な、隠し変数timeout。scalaには、implicit parameterといって、メソッドに必要な引数を普通に引数として渡すのではなく、周りのコードの引数と同じ型のimplicit宣言された値から取ってくるという、ぶっ飛んだ機能があって、こいつを初めて見たので何のことやらさっぱりわからんかった。
確かにこれなら、せっかく?メソッドが演算子的にかっこ良く書けるのに、timeoutをどう渡すんだよという難問を解決できる。
明示的にtimeoutを渡したい時は、
val  future = actor.?("world")(Timeout(5 seconds))
みたいにもできるが、見にくいし、かっこ悪い。
implicit parameterについては、ひしだまさんのページに書かれた一連のサンプルが非常にわかりやすいです。そのページに限らずひしだまさんのページは、考え抜かれた短いサンプルが大量にあって、scalaを使うには欠かすことができません。scalaの神様です。

で、疲れてきたんだけど、WebSocketにまだ触れてないなこれ。
参考にしたページは、
Auto-Saving ... Done  Play Framework 2.1のWebSocketで一対一通信

server側からclientに送信するには、onStartが呼ばれる時にやってくるchannelを使うので、WebSocketがつながっている間は、こいつを大事に取っておきましょう。
client側から何かを受信するたびに、Iteratee.foreach の中に入ってくるので、ここで簡単に受信データを処理できます。

scalaをいじっていて感じたのは、scala開発陣の、いかに簡潔なコードを作成可能な言語にするかという執念じみたものでした。その執念に、これからも期待したいです。