reactive-web: Lift をシンプルに

EventStream

はじめに

EventStream は、ほかのすべてのものがこれを土台に構築されているという意味で、reactive-core の中心的なトレイトです。このトレイトは、よく知られたリスナーパターンに対する関数抽象を提供します。ただし、命令型のテクニックを使う代わりに、EventStream を使えば、より効率的な抽象化を使ってイベントを送受信できます。

reactive はスレッディングを暗黙的に扱わない点に注意することが重要です。イベントは、それが発火されたスレッド上で通知され、そのスレッドの実行は、すべてのリスナーがイベントを処理したときに続行されます。

EventStream は、値の集合に似たものと考えるとよいかもしれません。ただし、すべての値が同時に存在するのではなく、各値は時間軸上の異なる点に存在しています。このアナロジーを頭の片隅にとどめておいてください。というのも、reactive-core のメソッドは、それに対応する scala コレクションフレームワークのメソッドのような名前が付けられているからです。

EventStream の作成

一般には、ライブラリから取得された EventStream を扱うことになるでしょう。しかし、自分で作成する必要がある場合も少なくありません。それには、EventSource をインスタンス化します (EventSourceEventStream を継承しています)。その後、fire を呼び出せば、イベントを送信することができます。

もちろん、いずれかの変換メソッド (以下を参照) を呼び出すときも、そのたびに新しい EventStream を作成します。たとえば、Timer があります。

Timer

EventStream の便利なサブクラスの 1 つに Timer があります。Timer は、指定された間隔でチックを発火します。チックの値は、チックの発火がスケジュールされた時点ではなく、チックが発火されたときの時間がベースになっています。

リスナーの追加: foreach

舞台裏では、EventStream はリスナーのコレクションを管理していますが、コンセプト的にはそのように考えるべきではありません。コンセプトの観点から言えば、リスナーを追加することでプログラマが望んでいることは、発火された各イベントに対して関数を実行することです。言い換えれば、EventStream のすべての値に対して、ということです。したがって、Scala コレクションのすべての値に対してある関数を実行したい場合に foreach を呼び出すのとまったく同様に、 EventStream の場合も foreach を呼び出します。ただし、コレクションの場合と異なり、foreach はただちに戻り、指定した関数は保存されて、イベントが発火するたびに実行されます。(もちろん、foreach を明示的に呼び出す代わりに、for 内包表記構文を使うこともできます。)

ここで次のような疑問が生じます。すなわち、関数およびその関数が持っている参照は、いったいどのように (EventStream の前に) ガベージコレクトされるのか、という疑問です。答えは、EventStream は関数を WeakReference 内部に保持していて、これによって関数がガベージコレコトされることを可能にしています。もしそうなら、関数が時期尚早にガベージコレクトされることを防いでいるのは何かという問題があります。実はこれが理由で、foreach (および forward) を呼び出すときは、暗黙的なスコープ内に Observing のインスタンスを持たなければなりません。EventStream は、関数への強い参照を Observing オブジェクト内部に置きます。メモリライフタイムに関する責任は、Observing が担当します。これを実装するための最も簡単な (そして一般に最も適切な) 方法は、外側を囲むクラスに Observing を継承させることです (自身に対する暗黙の参照が含まれるからです)。

変換

EventStream を非常に多目的にしているのは、その一連の変換メソッドで、これらのメソッドは、元の EventStream をベースにしながら何らかの修正が加えられたイベントを持つ EventStream を返します。さまざまな変換をつなぎ合わせることで、元の EventStream のイベントの値から非常に異なるイベントの値を持つ EventStream を取得することができます。これは、コレクションを次のように変換できるのと似ています: List(1,2,3).map(_ * 10).filter(_ < 25)。結果として得られる EventStream のコンシューマは、その EventStream が元の EventStream とどのような関係があるかを気にする必要はありません。元の EventStream がイベントを発火するたびに、変換後の EventStream は、指定された変換に従い、元のイベントに基づいて独自のイベントを発火することができます。背後ではどうなっているかですが、「親」 EventStream にリスナーが追加され、「子」 EventStream からの適切なイベントを発火するようになっています。

ライフタイムの細かなコントロール: takeWhile

関数のライフタイムを Observing で可能な以上に細かくコントロールする必要がある場合はどうすればよいでしょうか。

最初に示したコレクションのアナロジーを思い出してください。コレクションの一部だけを対象に処理を行いたい場合はどうするでしょうか。たくさんの方法があります。しかし、最初の部分を処理し、条件が真になったら処理をやめるとしたらどうしますか。もちろん、そういう場合には takeWhile を使うでしょう。 EventStream の場合も同じです。コレクションに対して takeWhile を実行すると新しいサブセットコレクションが返されるのとまったく同様に、EventStream に対して takeWhile を実行すると、新しい EventStream が返されます。この新しい EventStream は、条件が真である限りイベントを返します。

takeWhile を使用するうえで、スコープ内の Observing は必要ありません。条件が偽に評価されるようなイベントが元の EventStream で発火すると、ただちにイベントの受け取りは中止され、述語関数への参照と新しい EventStream は削除されます。

定型コードは述語の中で副作用を引き起こすべきではありません。ただし、そうしたい場合にそれが禁じられているわけではありません。

要点は何か?

さて、ここまで読んできて、これまでの説明は何のためなのか、といぶかる読者もいるかもしれません。通常のリスナーパターンをどうして使わないのか、Swing がそうしているように、と。

簡潔に答えると、ここで説明しているのは新しい思考法なのですが、それに慣れるといろいろなことがラクになる、ということです。たとえば、命令型のアプローチを採用し、ある状況に対応して、あるメソッドの中でリスナーを追加し、このリスナーを別の状況に対応して、別のメソッドの中で削除するということをすると、コード内のさまざまな場所に散らばったリスナーのライフサイクルを管理するコードが必要ということになります。言うまでもなく、そうしたコードを追跡するのは簡単ではありません。リスナーのライフサイクルを管理するというコンセプトを抽象化することによって、プログラミングの重点は、コンピュータが次になすべきことを記述することから、プログラマが行いたい処理を記述することへと移ることになり、結果として保守しやすいコードが生まれます。

しかし、おそらくそれよりも大きなメリットとして、変換も合成も可能な EventStream という汎用の抽象化が利用可能になる点があります。この点は、EventStream に関する説明、および EventStream と相互作用を持ち、その上に作られたあらゆるものに関する説明を読み進んでいけば、実感することができるでしょう。

フォーカスを絞った EventStream: filter

多くのイベントを発火する EventStream があって、そのうちの一部にしか関心がない場合はどうすればよいでしょうか。このような場合に使用できるのが、filter です。

すでに見たように、ある EventStream が与えられたとき、(述語に基づいて) 時間軸上のある時点までだけ「生きている」新しい EventStream を派生させることができます。こうした変換された EventStream を返す操作はほかにもたくさんあります。そのほとんどは、コレクションに対する各種メソッドにそのまま対応します。

たとえば、コレクションを filter すれば、述語にマッチする要素だけを含む新しいコレクションを取得することができるように、EventStreamfilter すれば、述語にマッチするイベントだけを発火する新しい EventStream を取得することができます。

すべてが変換された EventStream: map

もうひとつ、基本的なコレクションメソッドに map があります。このメソッドは、任意の関数を各要素に適用することでコレクションを変換します。返されたコレクションの各要素は、元のコレクションの対応する要素に関数を適用して計算された値を持ちます。

コレクションの場合と同様に、EventStreammap することができます。map に関数を渡すと、親 EventStream が発火したすべてのイベントに対して、新しい EventStream はイベントを発火し、そのイベントの値は親のイベントに関数を適用した結果となります。

filtermap の組み合わせ: collect

Scala のコレクションの場合と同様、EventStream にも便宜上、collect メソッドが用意されています。想像どおり、このメソッドは、どのイベントに応答するか、および応答するときにどのイベントを発火するかを指定した PartialFunction を取ります。

EventStream の切り替え: flatMap

もう少しレベルを上げて、今度は flatMapEventStream に適用してみましょう。

Scala コレクションフレームワークでは、flatMap はどのような処理をするのでしょうか。技術的に言えば、このメソッドは、コレクションのすべての要素に collection-valued 関数を適用し、関数によって返されたすべてのコレクションが連結されたものからなる新しいコレクションを返します。実際の処理で言えば、多くのシーケンスを縒り合わせたり、多数のシーケンスからなるシーケンスを平坦化したりといったことができます。次の例を見てください。

  val original = List(1, 2, 3)
  val flatMapped = original.flatMap(x => List(x*10,x*10+1,x*10_2))
  flatMapped == List(10,11,12,  20,21,22,  30,31,32)

同様にして、これを時間軸上のコレクションのアナロジーに適用すると、flatMap を使うことで、ほかのさまざまな EventStream によって発火されたイベントを発火する EventStream を作成することができます。親 EventStream によって発火された最初のイベントが受け取られた にならなければ、イベントは発火されないことに注意してください。子 EventStream からのイベントは、次の親イベントを受け取るまで発火され、その後は、次の子 EventStream からイベントを受け取るごとにイベントが発火され、以下同様に続きます。最も簡単な使い方は、ほかのいくつかの EventStreams を使い回すことです。たとえば、ある図形を拡大させ、次にフェードアウトさせるとします。

// Shape は scale と opacity (不透明度) の値を持つ case クラスで、
// millisTimer は 0 から始めて 1 ミリ秒ごとにイベントを
// 発火するとします.
// scale は最初の 1 秒で 0 から 1 へとアニメーションし、
// opacity は次の 1 秒で 1 から 0 へとアニメーションします.
def compositeAnimation(millisTimer: EventStream[Long], shape: Shape): EventStream[Shape] = {
  val scale: EventStream[Double] =
    millisTimer.map(m => m/1000.0)
  val opacity: EventStream[Double] =
    millisTimer.map(m => 1 - (m-1000)/1000.0)
  val seconds = millisTimer.filter(_ % 1000 == 0).map(_ / 1000).
    takeWhile(_ < 2000)
  
  seconds.flatMap {
    case 0 => scale
    case 1 => opacity
  }
}

もちろん、親 EventStream によって発火されたイベントに基づいて新しい EventStream を返すこともできます。

// seconds は 1 から 100 までの整数を
// インクリメントしながら毎秒発火するとします.
// ここで、次のように 10 ごとに値を逆にします: (10,9..2,1, 20..11, 30..21 ...)
seconds.filter(_ % 10 == 1).flatMap{ t =>
  seconds.map(s => t + 10 - s)
}

SignalEventStreamflatMap することもできます。こちらを参照してください。

状態を渡す: foldLeft

flatMap よりも複雑な方法で、イベントをさまざまな要因に基づいて処理する必要がある場合はどうすればよいでしょうか。

命令型言語の場合、一般的なタスクとして、配列などに対する繰り返し処理があります。この場合、配列に対する処理を行う途中で遭遇する状態に応じて、さまざまに変化するものを追跡するためにたくさんの変数を使います。関数型プログラミングでは、こうしたタスクをしばしば foldLeft を使って実現します。foldLeft に渡すのは、初期の「状態」と、最後の「状態」およびコレクション内の次の要素を取る関数です。この関数は、要素ごとに、次の呼び出しのときにあるべき状態を返します。(関数型メソッドでは並行実装が可能であり、コードの変更は必要ないことに注意してください)。たとえば、数のリストの合計を計算するよくある例を取り上げましょう。

list.foldLeft(0){(totalSoFar, nextElement) => totalSoFar + nextElement}
// より一般的には list.foldLeft(0)(_ + _) のように記述される.

これと同様に、EventStream に対して foldLeft を呼び出し、初期値のほか、「状態」値およびイベントを取って新しい値を返す関数を渡すことができます。これで、返された値を発火する EventStream を取得することになります (返された値は、次の関数呼び出しに渡されるのと同じ値)。サンプルでは、数が発火されるたびに平均値を発火する EventStream を作成しています。

EventStream の結合: |

2 つの EventStream の和を取ることもできます。新しい EventStream は、元の EventStream のいずれかが発火したすべてのイベントを発火します。

val allClicks = leftClicks | middleClicks | rightClicks

EventStreamSignal にする: hold

EventStream は、hold メソッドを使って Signal にすることができます。このメソッドには、次のイベントが発火するまで保持するシグナルの初期値を渡す必要があります。

無限ループの防止: nonrecursive

EventStream が発火したときに、再度その EventStream が発火する結果になるときは、無限の再帰 (言い換えれば StackOverflowError) に陥る可能性があります。そのような場合には、nonrecursive を呼び出します。nonrecursive が返す派生した EventStreamDynamicVariable (Scala の ThreadLocal) を使って、再帰的に発火が起こるのを防止します。

distinct

EventStream.distinct は、前のイベントと等しくないイベントに対してだけリスナーを呼び出す派生したイベントストリームを返します。

異なるスレッドでのイベントの処理: nonblocking

ほとんどの EventStream は、そのすべてのリスナーをイベントを発火したスレッドで呼び出すので、スレッドをブロックします。これは、イベントの処理に時間がかかる場合に問題になることがあります。

同様に、あるスレッドが EventStream でイベントを発火し、その EventStream がすでに別のスレッドからのイベントを処理中である場合、その EventStream は、すでに行われている別のイベント処理と同時に、新しいイベントをそのスレッドで処理することになります。イベントの処理が多くの作業から構成されている場合、これは作業の一部が冗長となる可能性があることを意味します。

EventStream に対して nonblocking を呼び出すと、イベントの処理を内部のアクターに任せる、派生した EventStream が返されます。これで、一度に処理されるイベントは 1 つだけになり、イベントを発火したスレッドはブロックしないようになります。例については、あとで示すサンプルを参照してください。

イベントが置き換えられていることの検出: zipWithStaleness

実行に長時間かかる処理量の多いイベントハンドラがあるケースなどでは、ハンドラが実行を開始した後に新しいイベントが発火されていないかどうかをチェックしたい場合があります。もし実際にそのようなイベントがあれば、作業を途中でやめて残りの部分をスキップできると便利なことがあります (新しいイベントによって作業が不要になる場合など)。これは、nonblocking を使っている場合には特に重要です。というのも、一度に処理できるイベントは 1 つだけなので、状況への対応が大きく遅れてしまう場合があるからです。たとえば、マウスのクリックを処理するのに 0.5 秒かかるとすると、ユーザーが 10 回クリックした場合、コンピュータがユーザーに「追い付く」には 5 秒を要します。この問題に対する 1 つの解決方法は、zipWithStaleness を使うことです。zipWithStaleness は、EventStream[(T, ()=>Boolean)] を返します。別の言い方をすると、各イベントは、実際のイベント値と関数を含む Tuple2 となります。この関数は、イベントが「置き換えられている」かどうかを返します。

for((click, isStale) <- mouseClicks.zipWithStaleness.nonblocking) {
  doSomeWork()
  if(!isStale()) doSomeMoreWork()
}