Wednesday, February 22, 2012

Akka Spaces






Expect frequent changes.





AkkaSpace


The idea behind the Akka Space library is (as its name suggests) to implement Tuple Spaces using Akka.






Types


import akka.actor.ActorRef

object Types {
type F[A] = PartialFunction[A, Unit]
type R[A] = (F[A], ActorRef)
}



The type R[A] models registrations of partial functions that a space actor (see below) wants to apply to an argument of type A that has been put into a space (see below).






Messages


import Types.F
import Types.R

case class Put[A](a: A)

case class Reg[A](f: F[A])

case class App[A](f: F[A], a: A)



The case classes Put[A] and Reg[A] model messages that a space actor can send to a space.


The case class App[A] models messages that a space can send back to a space actor.






Implicits


object Implicits {

import akka.actor.ActorSystem
import akka.util.Timeout

implicit val spaceSystem = ActorSystem("space")

implicit val timeout = new Timeout(1000)
}



The implicit values spaceSystem and timeout, when imported, simplify the code that follows.






Space


import akka.actor.Actor

class Space[A] extends Actor {

import Implicits.spaceSystem
import Implicits.timeout

import collection.immutable.Queue

import akka.agent.Agent

private val aqa = Agent(Queue[A]())
private val rqa = Agent(Queue[R[A]]())

import concurrent.stm.atomic
import concurrent.stm.InTxn

private def put(a: A) =
atomic {
_ =>
rqa.await find {
case (f, _) => f.isDefinedAt(a)
} match {
case None =>
aqa send (_ :+ a)
case Some(r) =>
rqa send (_ diff Queue(r))
val f = r._1
val ar = r._2
ar ! App(f, a)
}
}

private def reg(f: F[A], ar: ActorRef) =
atomic {
_ =>
aqa.await find {
case a => f.isDefinedAt(a)
} match {
case None =>
rqa send (_ :+(f, ar))
case Some(a) =>
aqa send (_ diff Queue(a))
ar ! App(f, a)
}
}

def receive = {
case Put(a: A) => put(a)
case Reg(f) => reg(f, sender)
case other => sys.error("space: this should never happen")
}

}



A space processes Put[A] and Reg[A] messages.
The space has two queues of agents


  • aqa: a queue of (agents of) arguments to be bound to partial functions
  • rqa: a queue of (agents of) registrations of partial functions to be applied to arguments

When receiving an argument

  • when there is no matching registration (of a partial function), the argument is added to the queue of arguments
  • when there is a matching registration (of a partial function), the partial function is sent back to its sender together with the matching argument

When receiving a registration (of a partial function)

  • when there is no matching argument, the registration is added to the queue of registrations
  • when there is a matching argument, the partial function is sent back to its sender together with the matching argument






SpaceActor


abstract class SpaceActor[A] extends Actor {

initialState()

protected def initialState(): Unit

def receive = {
case App(f, a) => f(a)
case other => sys.error("space actor: this should never happen")
}
}



A space actor processes App[A] messages by applying their partial function part to their argument part.


A space actor starts in its (to be defined) initial state.






PingPongTableApp


We are ready for a first application PingPongTableApp.






PingPong


sealed abstract class PingPong

case object Ping extends PingPong

case object Pong extends PingPong

object Done extends PingPong



The sealed abstract class PingPong and the case objects implementing it model the data that drive the application.






PingPongTable


case class PingPongTable(n: Int)
extends Space[PingPong] {

import akka.actor.Props

val pingPongPlayerRefs =
(for {i <- 1 to (2 * n)} yield {
if (i <= n) {
context.actorOf(
Props(Pinger()),
name = "pinger_" + i)
} else {
context.actorOf(
Props(Ponger()),
name = "ponger_" + (i - n))
}
}).toList

val umpireRef = context.actorOf(
Props(Umpire()),
name = "_umpire_")

override def preStart() {
val pingerRef1 = pingPongPlayerRefs.head
self.!(Put(Ping))(pingerRef1)
}
}

object PingPongTableGlobal {

import Implicits.spaceSystem

import akka.actor.Props

val numberOfPlayers = 2

val pingPongTableRef = spaceSystem.actorOf(
Props(PingPongTable(numberOfPlayers)),
name = "pingPongTable")
}



The case class PingPongTable models the pingpong table.


A pingpong table constructs pingers (see below), pongers (see below) and an umpire (see below).


The first pinger puts a ping on the pingpong table.






Pinger and Ponger


import PingPongTableGlobal.pingPongTableRef

case class Pinger()
extends SpaceActor[PingPong] {

import akka.event.Logging

val log = Logging(context.system, this)

private def pingState() {
import Types.F

val pingFun: F[PingPong] = {
case Pong =>
import scala.math.random
import scala.math.round
Thread.sleep(round(1000 * random))
if (random < 0.95) {
log.info("ping")
pingPongTableRef ! Put(Ping)
pingState()
} else {
log.info("done")
pingPongTableRef ! Put(Done)
}
}
pingPongTableRef ! Reg(pingFun)
}

def initialState() {
pingState()
}
}

case class Ponger()
extends SpaceActor[PingPong] {

import akka.event.Logging

val log = Logging(context.system, this)

private def pongState() {
import Types.F

val pongFun: F[PingPong] = {
case Ping =>
import scala.math.random
import scala.math.round
Thread.sleep(round(1000 * random))
if (random < 0.95) {
log.info("pong")
pingPongTableRef ! Put(Pong)
pongState()
} else {
log.info("done")
pingPongTableRef ! Put(Done)
}
}
pingPongTableRef ! Reg(pongFun)
}

def initialState() {
pongState()
}
}



A pinger starts in its ping state receiving a Pong from a ponger. This can either succeed, in which case he puts Ping on the pingpong table and stays in its ping state, or fail, in which case he puts a Done on the pingpong table.


A ponger starts in its pong state receiving a Ping from a pinger. This can either succeed, in which case he puts Pong on the pingpong table and stays in its pong state, or fail, in which case he puts a Done on the pingpong table.






Umpire


case class Umpire()
extends SpaceActor[PingPong] {

import akka.event.Logging

val log = Logging(context.system, this)

private def stopState() {
import Types.F
import Implicits.spaceSystem

val stopFun: F[PingPong] = {
case Done =>
log.info("shutdown")
spaceSystem.shutdown()
}
pingPongTableRef ! Reg(stopFun)
}

def initialState() {
stopState()
}
}



An umpire starts in its stop state. When receiving a Done from a pinger or a ponger he shuts down the system.






PingPongTableApp


object PingPongTableApp {
def main(args: Array[String]) = {
PingPongTableGlobal.pingPongTableRef
()
}
}



The pingpong table app simply installs the pingpong table.






Running the PingPongTableApp


Running the pingpong table app produces something like.




[INFO] [02/23/2012 11:58:25.477] ... [akka://space/.../ponger_2] pong
[INFO] [02/23/2012 11:58:25.742] ... [akka://space/.../pinger_1] ping
[INFO] [02/23/2012 11:58:25.751] ... [akka://space/.../ponger_1] pong
[INFO] [02/23/2012 11:58:26.701] ... [akka://space/.../pinger_2] ping
[INFO] [02/23/2012 11:58:27.398] ... [akka://space/.../ponger_2] pong
[INFO] [02/23/2012 11:58:27.790] ... [akka://space/.../pinger_1] ping
[INFO] [02/23/2012 11:58:27.903] ... [akka://space/.../ponger_1] pong
[INFO] [02/23/2012 11:58:28.17] ... [akka://space/.../pinger_2] ping
[INFO] [02/23/2012 11:58:28.617] ... [akka://space/.../ponger_2] pong
[INFO] [02/23/2012 11:58:28.986] ... [akka://space/.../pinger_1] ping
[INFO] [02/23/2012 11:58:29.476] ... [akka://space/.../ponger_1] pong
[INFO] [02/23/2012 11:58:30.202] ... [akka://space/.../pinger_2] ping
[INFO] [02/23/2012 11:58:30.819] ... [akka://space/.../ponger_2] done
[INFO] [02/23/2012 11:58:30.820] ... [akka://space/.../_umpire_] shutdown





StickTableApp


We are ready for a second application StickTableApp.






Stick


case class Stick(i: Int) {
override def toString = "stick_" + i
}



The case class Stick models the data that drive the application.






StickTable


case class StickTable(n: Int)
extends Space[Stick] {

import akka.actor.Props

(1 to n) foreach {
i =>
context.actorOf(
Props(Hakker(i, 2000, 2000, 2000, 100)),
name = "hacker_" + i)
}

override def preStart() {
(1 to n) foreach {
i =>
self ! Put(Stick(i))
}
}
}

object StickTableGlobal {

import Implicits.spaceSystem

import akka.actor.Props

val numberOfSticks = 5

val stickTableRef = spaceSystem.actorOf(
Props(StickTable(numberOfSticks)),
name = "stickTable")
}



The case class StickTable models the stick table.


A Stick table constructs hackers (see below).


Some sticks are put on the stick table.






Hacker


import StickTableGlobal.numberOfSticks
import StickTableGlobal.stickTableRef

case class Hakker
(n: Int, startTime: Long, thinkTime: Long, eatTime: Long, delayTime: Long)
extends SpaceActor[Stick] {

import akka.event.Logging

val log = Logging(context.system, this)

private val left = n
private val right = (n + 1) % numberOfSticks

import scala.math.random
import scala.math.round

private def takeLeftState() {
import Types.F

val takeLeftFun: F[Stick] = {
case l@Stick(`left`) => {
log.info("has taken " + l)
takeRightState()
}
}
stickTableRef ! Reg(takeLeftFun)
}

private def takeRightState() {
import Types.F

val takeRightFun: F[Stick] = {
case r@Stick(`right`) => {
log.info("has taken " + r)
eat()
putBackLeftAndRight()
think()
takeLeftState()
}
}
Thread.sleep(round(delayTime * random))
stickTableRef ! Reg(takeRightFun)
}

private def eat() {
log.info("starts eating")
Thread.sleep(round(eatTime * random))
log.info("stops eating")
}

private def putBackLeftAndRight() {
val l = Stick(left)
log.info("puts back " + l)
stickTableRef ! Put(l)
val r = Stick(right)
log.info("puts back " + r)
stickTableRef ! Put(r)
}

private def think() {
log.info("starts thinking")
Thread.sleep(round(thinkTime * random))
log.info("stops thinking")
}

def initialState() {
Thread.sleep(round(startTime * random))
takeLeftState()
}
}



A hacker starts in its take left state taking the Stick left to it and then he is in its take right state taking the Stick right to it.


When he has taken both sticks, he starts eating, puts back the sticks, thinks for a moment, and is back in its take left state.






StickTableApp


object StickTableApp {
def main(args: Array[String]) = {
StickTableGlobal.stickTableRef
()
}
}



The stick table app simply installs the stick table.






Running the StickTableApp


Running the stick table app produces something like.




[INFO] [02/23/2012 11:46:44.115] ... [akka://space/.../hacker_3] has taken stick_3
[INFO] [02/23/2012 11:46:44.213] ... [akka://space/.../hacker_3] has taken stick_4
[INFO] [02/23/2012 11:46:44.214] ... [akka://space/.../hacker_3] starts eating
[INFO] [02/23/2012 11:46:44.626] ... [akka://space/.../hacker_1] has taken stick_1
[INFO] [02/23/2012 11:46:44.635] ... [akka://space/.../hacker_1] has taken stick_2
[INFO] [02/23/2012 11:46:44.635] ... [akka://space/.../hacker_1] starts eating
[INFO] [02/23/2012 11:46:44.975] ... [akka://space/.../hacker_1] stops eating
[INFO] [02/23/2012 11:46:44.976] ... [akka://space/.../hacker_1] puts back stick_1
[INFO] [02/23/2012 11:46:44.976] ... [akka://space/.../hacker_1] puts back stick_2
[INFO] [02/23/2012 11:46:44.977] ... [akka://space/.../hacker_1] starts thinking
[INFO] [02/23/2012 11:46:45.550] ... [akka://space/.../hacker_5] has taken stick_5
[INFO] [02/23/2012 11:46:45.618] ... [akka://space/.../hacker_5] has taken stick_1
[INFO] [02/23/2012 11:46:45.618] ... [akka://space/.../hacker_5] starts eating
[INFO] [02/23/2012 11:46:45.630] ... [akka://space/.../hacker_5] stops eating
[INFO] [02/23/2012 11:46:45.631] ... [akka://space/.../hacker_5] puts back stick_5
[INFO] [02/23/2012 11:46:45.635] ... [akka://space/.../hacker_5] puts back stick_1
[INFO] [02/23/2012 11:46:45.635] ... [akka://space/.../hacker_5] starts thinking
[INFO] [02/23/2012 11:46:45.663] ... [akka://space/.../hacker_5] stops thinking
[INFO] [02/23/2012 11:46:45.663] ... [akka://space/.../hacker_5] has taken stick_5
[INFO] [02/23/2012 11:46:45.716] ... [akka://space/.../hacker_1] stops thinking
[INFO] [02/23/2012 11:46:45.716] ... [akka://space/.../hacker_1] has taken stick_1
[INFO] [02/23/2012 11:46:45.782] ... [akka://space/.../hacker_1] has taken stick_2
[INFO] [02/23/2012 11:46:45.783] ... [akka://space/.../hacker_1] starts eating
[INFO] [02/23/2012 11:46:46.22] ... [akka://space/.../hacker_3] stops eating
[INFO] [02/23/2012 11:46:46.23] ... [akka://space/.../hacker_3] puts back stick_3
[INFO] [02/23/2012 11:46:46.23] ... [akka://space/.../hacker_3] puts back stick_4
[INFO] [02/23/2012 11:46:46.24] ... [akka://space/.../hacker_3] starts thinking
[INFO] [02/23/2012 11:46:46.27] ... [akka://space/.../hacker_4] has taken stick_4
[INFO] [02/23/2012 11:46:47.407] ... [akka://space/.../hacker_1] stops eating
[INFO] [02/23/2012 11:46:47.408] ... [akka://space/.../hacker_1] puts back stick_1
[INFO] [02/23/2012 11:46:47.408] ... [akka://space/.../hacker_1] puts back stick_2
[INFO] [02/23/2012 11:46:47.408] ... [akka://space/.../hacker_1] starts thinking
[INFO] [02/23/2012 11:46:47.411] ... [akka://space/.../hacker_5] has taken stick_1
[INFO] [02/23/2012 11:46:47.411] ... [akka://space/.../hacker_5] starts eating
[INFO] [02/23/2012 11:46:47.412] ... [akka://space/.../hacker_2] has taken stick_2
[INFO] [02/23/2012 11:46:47.473] ... [akka://space/.../hacker_1] stops thinking
[INFO] [02/23/2012 11:46:47.479] ... [akka://space/.../hacker_2] has taken stick_3
[INFO] [02/23/2012 11:46:47.479] ... [akka://space/.../hacker_2] starts eating
[INFO] [02/23/2012 11:46:47.480] ... [akka://space/.../hacker_5] stops eating
[INFO] [02/23/2012 11:46:47.481] ... [akka://space/.../hacker_5] puts back stick_5
[INFO] [02/23/2012 11:46:47.481] ... [akka://space/.../hacker_5] puts back stick_1
[INFO] [02/23/2012 11:46:47.481] ... [akka://space/.../hacker_5] starts thinking
[INFO] [02/23/2012 11:46:47.481] ... [akka://space/.../hacker_1] has taken stick_1
[INFO] [02/23/2012 11:46:47.886] ... [akka://space/.../hacker_3] stops thinking
[INFO] [02/23/2012 11:46:48.703] ... [akka://space/.../hacker_2] stops eating
[INFO] [02/23/2012 11:46:48.704] ... [akka://space/.../hacker_2] puts back stick_2
[INFO] [02/23/2012 11:46:48.705] ... [akka://space/.../hacker_2] puts back stick_3
[INFO] [02/23/2012 11:46:48.705] ... [akka://space/.../hacker_2] starts thinking
[INFO] [02/23/2012 11:46:48.707] ... [akka://space/.../hacker_1] has taken stick_2
[INFO] [02/23/2012 11:46:48.708] ... [akka://space/.../hacker_1] starts eating
[INFO] [02/23/2012 11:46:48.709] ... [akka://space/.../hacker_3] has taken stick_3
[INFO] [02/23/2012 11:46:49.329] ... [akka://space/.../hacker_5] stops thinking
[INFO] [02/23/2012 11:46:49.330] ... [akka://space/.../hacker_5] has taken stick_5
[INFO] [02/23/2012 11:46:50.283] ... [akka://space/.../hacker_2] stops thinking
[INFO] [02/23/2012 11:46:50.367] ... [akka://space/.../hacker_1] stops eating
[INFO] [02/23/2012 11:46:50.367] ... [akka://space/.../hacker_1] puts back stick_1
[INFO] [02/23/2012 11:46:50.368] ... [akka://space/.../hacker_1] puts back stick_2
[INFO] [02/23/2012 11:46:50.368] ... [akka://space/.../hacker_1] starts thinking
[INFO] [02/23/2012 11:46:50.368] ... [akka://space/.../hacker_5] has taken stick_1
[INFO] [02/23/2012 11:46:50.369] ... [akka://space/.../hacker_5] starts eating
[INFO] [02/23/2012 11:46:50.369] ... [akka://space/.../hacker_2] has taken stick_2
[INFO] [02/23/2012 11:46:50.721] ... [akka://space/.../hacker_5] stops eating
[INFO] [02/23/2012 11:46:50.722] ... [akka://space/.../hacker_5] puts back stick_5
[INFO] [02/23/2012 11:46:50.722] ... [akka://space/.../hacker_5] puts back stick_1
[INFO] [02/23/2012 11:46:50.723] ... [akka://space/.../hacker_5] starts thinking
[INFO] [02/23/2012 11:46:51.250] ... [akka://space/.../hacker_1] stops thinking
[INFO] [02/23/2012 11:46:51.251] ... [akka://space/.../hacker_1] has taken stick_1
[INFO] [02/23/2012 11:46:52.303] ... [akka://space/.../hacker_5] stops thinking
[INFO] [02/23/2012 11:46:52.304] ... [akka://space/.../hacker_5] has taken stick_5







It is instructive to play around with sleep times.