Friday, April 13, 2012

Pure Functional Futures






Expect frequent changes.





Pure Functional Futures


The idea behind this post is (as its title suggests) to implement futures in a pure functional way.

More precisely, the identity monad is transformed to add state and control to it.
The implementation does not make use of var's nor does it make use of the delimited continuations plugin.




Note: this post is not self contained.






Computational Features


Here are the main computational features that the implementation makes use of




// computations yield a result
def result[Z]: Z => M[Z]

// functor binding (and sequencing)
def bindF[B](a_f_b: A => B): M[B]

def seqF[B](b: => B) =
m_a bindF { _ =>
b
}

// idiom binding (and sequencing)
def bindI[B](m__a_f_b: M[A => B]): M[B]

def seqI[B](b: => B) =
m_a bindI {
result { (_: A) =>
b
}
}

// monad binding (and sequencing)
def bindM[B](a__f__m_b: A => M[B]): M[B]

def seqM[B](m_b: => M[B]) =
m_a bindM { _ =>
m_b
}

// state (I made setS private[imaginej])
def getS: Unit => M[State]

private[imaginej] def setS: State => M[Unit]

def doS: (State => State) => M[Unit] =
s_f_s =>
getS(()) bindM { s =>
setS(s_f_s(s))
}

// control
def reset(m_b: M[B]): M[C]

def shift(a__f__m_b__f___m_c: (A => M[B]) => M[C]): M[A]






Pure Functional Futures


As stated above, all we need is state and control, so we are ready for our implementation.

Cells and futures (making use of cells) are implemented in a way that closely follows the implementation of
Node.scala - Implementing Scalable Async IO using Delimited Continuations as presented at the Scala Days 2012.




private[imaginej] trait FutureIdentityMonadWithOptionStateWithControlModule
extends IdentityMonadWithStateWithControlModule {

import java.util.concurrent.{ Future => FutureJ }
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.logging.Logger

import instances.identity.monad.state.IdentityMonadWithStateModule

import scala.math._

val logger = Logger.getLogger("")

def sleep(time: Int) {
Thread.sleep(round(time * random))
}

type B = A
type C = FutureJ[A]

object IdentityMonadWithOptionState
extends IdentityMonadWithStateModule {
type State = Option[A]
}

private[imaginej] val fromMonad = IdentityMonadWithOptionState
private[imaginej]type MonadModuleType = IdentityMonadWithOptionState.type

class Cell(executor: ExecutorService) {
def get(): M[FutureJ[A]] =
reset {
shift { c =>
getS(()) bindM { o_a =>
o_a match {
case Some(a) =>
val callable = new Callable[A] {
def call(): A = {
logger.info("submit c(" + a + ") ...")
(c(a) run ())(o_a)._1
}
}
result {
executor.submit(callable)
}
case None =>
sys.error("state not set")
}
}
} bindF { a =>
sleep(10000);
logger.info("result(" + a + ")")
a
}
}

def set(a: A): M[Cell] =
getS(()) bindM { o_a =>
o_a match {
case Some(_) =>
sys.error("state already set")
case None =>
logger.info("set " + a)
setS(Some(a)) seqF {
this
}
}
}
}

def m_cell(implicit executor: ExecutorService): A => M[Cell] =
new Cell(executor) set (_)

type Future[A] = M[FutureJ[A]]

def future(implicit executor: ExecutorService): A => Future[A] =
a => {
val m__fj_a = m_cell(executor)(a) bindM { cell =>
cell get ()
}
result {
((m__fj_a run ())(None))._1
}
}

}





Example


Here is an example




object PureFutureExample {
val logger = Logger.getLogger("")
def sleep(time: Int) {
Thread.sleep(round(time * random))
}
object Implicits {
implicit val executor = new ScheduledThreadPoolExecutor(10)
}
def parFuturesTest() {
import Implicits.executor
val monad_l = new IntFutureIdentityMonadWithOptionStateWithControl
val monad_r = new IntFutureIdentityMonadWithOptionStateWithControl
val future_l = monad_l.future
val future_r = monad_r.future
logger.info("set futures")
val (m__fj_l, m__fj_r) = (future_l apply (1), future_r apply (2))
logger.info("sleep before get futures ...")
sleep(12000)
logger.info("get futures")
val m_l =
m__fj_l bindF { fj_l =>
fj_l.get()
}
val m_r =
m__fj_r bindF { fj_r =>
fj_r.get()
}
logger.info("(" + ((m_l run ())(None))._1 + ", " + ((m_r run ())(None))._1 + ")")
executor.shutdown()
}

def main(args: Array[String]) {
parFuturesTest()
}
}





Running the Example


Running the example a few times shows parallelism.




[info] Running examples.future.PureFutureExample
Apr 13, 2012 2:35:22 PM java.util.logging.LogManager$RootLogger log
INFO: set futures
Apr 13, 2012 2:35:22 PM java.util.logging.LogManager$RootLogger log
INFO: set 1
Apr 13, 2012 2:35:22 PM java.util.logging.LogManager$RootLogger log
INFO: set 2
Apr 13, 2012 2:35:22 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(1) ...
Apr 13, 2012 2:35:22 PM java.util.logging.LogManager$RootLogger log
INFO: sleep before get futures ...
Apr 13, 2012 2:35:22 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(2) ...
Apr 13, 2012 2:35:27 PM java.util.logging.LogManager$RootLogger log
INFO: result(2)
Apr 13, 2012 2:35:27 PM java.util.logging.LogManager$RootLogger log
INFO: get futures
Apr 13, 2012 2:35:28 PM java.util.logging.LogManager$RootLogger log
INFO: result(1)
Apr 13, 2012 2:35:28 PM java.util.logging.LogManager$RootLogger log
INFO: (1, 2)

[info] Running examples.future.PureFutureExample
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: set futures
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: set 1
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(1) ...
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: set 2
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(2) ...
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: sleep before get futures ...
Apr 13, 2012 2:35:36 PM java.util.logging.LogManager$RootLogger log
INFO: result(2)
Apr 13, 2012 2:35:40 PM java.util.logging.LogManager$RootLogger log
INFO: result(1)
Apr 13, 2012 2:35:47 PM java.util.logging.LogManager$RootLogger log
INFO: get futures
Apr 13, 2012 2:35:47 PM java.util.logging.LogManager$RootLogger log
INFO: (1, 2)

[info] Running examples.future.PureFutureExample
Apr 13, 2012 2:35:54 PM java.util.logging.LogManager$RootLogger log
INFO: set futures
Apr 13, 2012 2:35:54 PM java.util.logging.LogManager$RootLogger log
INFO: set 1
Apr 13, 2012 2:35:54 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(1) ...
Apr 13, 2012 2:35:54 PM java.util.logging.LogManager$RootLogger log
INFO: set 2
Apr 13, 2012 2:35:54 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(2) ...
Apr 13, 2012 2:35:54 PM java.util.logging.LogManager$RootLogger log
INFO: sleep before get futures ...
Apr 13, 2012 2:35:59 PM java.util.logging.LogManager$RootLogger log
INFO: result(1)
Apr 13, 2012 2:35:59 PM java.util.logging.LogManager$RootLogger log
INFO: result(2)
Apr 13, 2012 2:36:01 PM java.util.logging.LogManager$RootLogger log
INFO: get futures
Apr 13, 2012 2:36:01 PM java.util.logging.LogManager$RootLogger log
INFO: (1, 2)

[info] Running examples.future.PureFutureExample
Apr 13, 2012 2:39:45 PM java.util.logging.LogManager$RootLogger log
INFO: set futures
Apr 13, 2012 2:39:45 PM java.util.logging.LogManager$RootLogger log
INFO: set 1
Apr 13, 2012 2:39:45 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(1) ...
Apr 13, 2012 2:39:45 PM java.util.logging.LogManager$RootLogger log
INFO: set 2
Apr 13, 2012 2:39:45 PM java.util.logging.LogManager$RootLogger log
INFO: submit c(2) ...
Apr 13, 2012 2:39:45 PM java.util.logging.LogManager$RootLogger log
INFO: sleep before get futures ...
Apr 13, 2012 2:39:52 PM java.util.logging.LogManager$RootLogger log
INFO: get futures
Apr 13, 2012 2:39:53 PM java.util.logging.LogManager$RootLogger log
INFO: result(2)
Apr 13, 2012 2:39:54 PM java.util.logging.LogManager$RootLogger log
INFO: result(1)
Apr 13, 2012 2:39:54 PM java.util.logging.LogManager$RootLogger log
INFO: (1, 2)