akkaとWebSocketが両方入っていて、サンプルとしては焦点が絞れてないけれど、二つ記事を書くのが面倒なので、そのまんまです。
環境は、PlayFramework2.1.1でscalaもakkaもplayに付属のもの。
まず、初心者がscala、PlayFramework、akkaで一番はまるポイントは、これらは、現在バリバリ進化中の技術なので、ググった情報が古くなってしまっていることが多い、ということ。たった半年前の記事でさえ、古いこともあるので注意が必要です。もちろん、この記事もすぐに古くなりそう。
コードは、PlayのアプリケーションとActorを書いたやつの2ファイル。
・ Playのアプリケーション側
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | package controllers import play.api. _ import play.api.mvc. _ import models. _ import play.api.libs.iteratee.Concurrent.Channel import play.api.libs.concurrent.Akka import akka.util.Timeout import akka.actor. _ import akka.pattern.ask // ? の使用に必要!! import scala.concurrent. _ import scala.concurrent.duration. _ // 5 seconds の表現に必要 import ExecutionContext.Implicits.global // FutureのonSucceessを使うのに便利 import play.api.libs.iteratee. _ import play.api.Play.current // Akka.system を使うのに必要 object AppHelloActor extends Controller { val actor = Akka.system.actorOf(Props[HelloActor], name = "helloactor" ) def index = Action { Ok(views.html.test( "test page." )) } def actorWebSocketsSample() = WebSocket.using[String] { request = > def onStart : Channel[String] = > Unit = { channel = > // このchannelを取っておくとclientにいろいろ送信できる println( "actorWebSocketsSample unicast onStart" ) implicit val timeout = Timeout( 5 seconds) // この下の?の隠れ引数のためにここで宣言?! こういう言語は初めてだぜ。 val future = actor ? "world" //val future = actor ? 3.14159 // 上の行をコメントにして、こっちを生かすとonFailureでtimeoutを取れる //val future = actor.?("world")(Timeout(5 seconds)) future.onSuccess { case res : Array[String] = > res.foreach(value = > println( "RESULT = " + value)) channel.push(res.mkString( " " )) // clientにメッセージ送信 channel.eofAndEnd() } future.onFailure { case e = > println( "future.onFailure - " + e.getMessage()) channel.eofAndEnd() } } def onError : (String, Input[String]) = > Unit = { (message, input) = > println( "actorWebSocketsSample unicast onError " + message) } def onComplete = println( "actorWebSocketsSample unicast onComplete" ) val enumerator = Concurrent.unicast[String](onStart, onComplete, onError) val iteratee = Iteratee.foreach[String] { rcvData = > // 今回は、上のonStartで処理終了しちゃってるから何もしてないけど、 // ここでWebSocketクライアントから受信したデータを処理する。 } mapDone { _ = > println( "Disconnected" )} (iteratee, enumerator) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | package models import akka.actor.Actor class HelloActor() extends Actor { def receive = { case s : String = > sender ! Array(s "Hello, $s." , "Bye Bye." ) case _ = > // timeoutさせたいので、何もしない } } |
上のコード、importがだらだらとあって省略していないけど、実はここが今回の最重要ポイント。
「webのサンプルと環境が同じはずなのに、コンパイル出来ねぇぜ。大体、こんなメソッドはドキュメントにも載ってないぞ」
とかいうscalaでありがちな状況は、importが足らないのが原因ということが多い。
akkaのActorを使う上で、最初参考にしたのが、
Scalaで並行処理#2 – AkkaのActorを使う
のページで、とてもわかりやすいサンプルと解説の素晴らしい記事。
で、!(投げっぱなし) !!(同期) !!!(非同期)を駆使してやるぞと意気込んでみたものの、!!と!!!のメソッドが見当たらない。
おまけにActor側のself.replyも見当たらない。どうやら仕様変更があった模様。
{まじすか、勘弁してくださいよ先生・・・}
とめげかけるが、akkaの公式ページに解答を発見。(Use With Actorsの項)
どうやら、Actorから、もしくはActorへの通信は、!(返事いらない) と?(返事欲しい)の2種類になったっぽい。たしかにこの表記の方がシンプルでわかりやすい。
前の!!のように、同期で返事を待ちたい場合は、?でもらえるFutureを使って、
1 | val result = Await.result(future, timeout.duration).asInstanceOf[String] |
Futureの使い方についてとても参考になったのは、
Starlight -Little Programmer’s Diary- Scala 2.10.0 Futures and Promises
のページです。サンプル多くてわかりやすいです。
あと、Actor側のself.replyもsender !になった。
{でも、!ってActorへのメッセージじゃなかったっけ?}
と思ったが、なんと?メソッドが、Actorにメッセージを投げる時に内部で一時的にActorを作ってくれるから、Actorからの戻しも!でいいとのこと。すげー。シンプルだ。
この、scalaとakka開発陣の「仕様変えるな、こら」という声に負けない攻めの姿勢は大好きです。
scalaのことだから、前の書き方でも、たぶん何かimportするだけで動くんだと思う(知らないけど)、これができるのも仕様変更に踏み切れる理由なんだろう。implicit conversion恐るべし。
ただ、上の?メソッドも、ActorRefにも、ScalaActorRefにも見当たらない、これは、コード10行目の import akka.pattern.ask すると使えるようになる。
このimportで、ActorRefがAskableActorRefに、implicit conversionで変身するので?が使用可能になる。これは、確かにわかりにくいが、scalaの拡張性やコードの簡潔さを支えているのが、良くも悪くもimplicit conversionだということがよくわかった。
あと、はまったのが、?メソッドに必要な、隠し変数timeout。scalaには、implicit parameterといって、メソッドに必要な引数を普通に引数として渡すのではなく、周りのコードの引数と同じ型のimplicit宣言された値から取ってくるという、ぶっ飛んだ機能があって、こいつを初めて見たので何のことやらさっぱりわからんかった。
確かにこれなら、せっかく?メソッドが演算子的にかっこ良く書けるのに、timeoutをどう渡すんだよという難問を解決できる。
明示的にtimeoutを渡したい時は、
1 | val future = actor.?( "world" )(Timeout( 5 seconds)) |
implicit parameterについては、ひしだまさんのページに書かれた一連のサンプルが非常にわかりやすいです。そのページに限らずひしだまさんのページは、考え抜かれた短いサンプルが大量にあって、scalaを使うには欠かすことができません。scalaの神様です。
で、疲れてきたんだけど、WebSocketにまだ触れてないなこれ。
参考にしたページは、
Auto-Saving ... Done Play Framework 2.1のWebSocketで一対一通信
server側からclientに送信するには、onStartが呼ばれる時にやってくるchannelを使うので、WebSocketがつながっている間は、こいつを大事に取っておきましょう。
client側から何かを受信するたびに、Iteratee.foreach の中に入ってくるので、ここで簡単に受信データを処理できます。
scalaをいじっていて感じたのは、scala開発陣の、いかに簡潔なコードを作成可能な言語にするかという執念じみたものでした。その執念に、これからも期待したいです。