Improve this actor calling futures
I've an actor (Worker) which basically ask 3 other actors (Filter1,
Filter2 and Filter3) for a result. If any of them return a false, It's
unnecessary to wait for the others, like an "and" operation over the
results. When a false response is receive, a cancel message is sent to the
actors in a way to cancel the queued work and make it more effective in
the execution.
Filters aren't children of Worker, but there are a common pool of actor
which are used by all Worker actors. I use an Agent to maintain the
collection of cancel Works. Then, before a particular work is processed, I
check in the cancel agent if that work was cancel, and then avoid the
execution for it. Cancel has a higher priority than Work, then, it is
processed always first.
The code is something like this
Proxy, who create the actors tree:
import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext.Implicits.global
import com.typesafe.config.Config
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.PoisonPill
import akka.actor.Props
import akka.agent.Agent
import akka.routing.RoundRobinRouter
class Proxy extends Actor with ActorLogging {
val agent1 = Agent(new HashSet[Work])
val agent2 = Agent(new HashSet[Work])
val agent3 = Agent(new HashSet[Work])
val filter1 =
context.actorOf(Props(Filter1(agent1)).withDispatcher("priorityMailBox-dispatcher")
.withRouter(RoundRobinRouter(24)), "filter1")
val filter2 =
context.actorOf(Props(Filter2(agent2)).withDispatcher("priorityMailBox-dispatcher")
.withRouter(RoundRobinRouter(24)), "filter2")
val filter3 =
context.actorOf(Props(Filter3(agent3)).withDispatcher("priorityMailBox-dispatcher")
.withRouter(RoundRobinRouter(24)), "filter3")
//val workerRouter =
context.actorOf(Props[SerialWorker].withRouter(RoundRobinRouter(24)),
name = "workerRouter")
val workerRouter = context.actorOf(Props(new Worker(filter1, filter2,
filter3)).withRouter(RoundRobinRouter(24)), name = "workerRouter")
def receive = {
case w: Work =>
workerRouter forward w
}
}
Worker:
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout
import akka.actor.ActorRef
import akka.routing.RoundRobinRouter
import akka.agent.Agent
import scala.collection.mutable.HashSet
class Worker(filter1: ActorRef, filter2: ActorRef, filter3: ActorRef)
extends Actor with ActorLogging {
implicit val timeout = Timeout(30.seconds)
def receive = {
case w:Work =>
val start = System.currentTimeMillis();
val futureF3 = (filter3 ? w).mapTo[Response]
val futureF2 = (filter2 ? w).mapTo[Response]
val futureF1 = (filter1 ? w).mapTo[Response]
val aggResult = Future.find(List(futureF3, futureF2, futureF1)) {
res => !res.reponse }
Await.result(aggResult, timeout.duration) match {
case None =>
Nqueen.fact(10500000L)
log.info(s"[${w.message}] Procesado mensaje TRUE en
${System.currentTimeMillis() - start} ms");
sender ! WorkResponse(w, true)
case _ =>
filter1 ! Cancel(w)
filter2 ! Cancel(w)
filter3 ! Cancel(w)
log.info(s"[${w.message}] Procesado mensaje FALSE en
${System.currentTimeMillis() - start} ms");
sender ! WorkResponse(w, false)
}
}
}
and Filters:
import scala.collection.mutable.HashSet
import scala.util.Random
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.actorRef2Scala
import akka.agent.Agent
trait CancellableFilter { this: Actor with ActorLogging =>
//val canceledJobs = new HashSet[Int]
val agent: Agent[HashSet[Work]]
def cancelReceive: Receive = {
case Cancel(w) =>
agent.send(_ += w)
//log.info(s"[$t] El trabajo se cancelara (si llega...)")
}
def cancelled(w: Work): Boolean =
if (agent.get.contains(w)) {
agent.send(_ -= w)
true
} else {
false
}
}
abstract class Filter extends Actor with ActorLogging { this:
CancellableFilter =>
val random = new Random(System.currentTimeMillis())
def response: Boolean
val timeToWait: Int
val timeToExecutor: Long
def receive = cancelReceive orElse {
case w:Work if !cancelled(w) =>
//log.info(s"[$t] Llego trabajo")
Thread.sleep(timeToWait)
Nqueen.fact(timeToExecutor)
val r = Response(response)
//log.info(s"[$t] Respondio ${r.reponse}")
sender ! r
}
}
object Filter1 {
def apply(agente: Agent[HashSet[Work]]) = new Filter with
CancellableFilter {
val timeToWait = 74
val timeToExecutor = 42000000L
val agent = agente
def response = true //random.nextBoolean
}
}
object Filter2 {
def apply(agente: Agent[HashSet[Work]]) = new Filter with
CancellableFilter {
val timeToWait = 47
val timeToExecutor = 21000000L
val agent = agente
def response = true //random.nextBoolean
}
}
object Filter3 {
def apply(agente: Agent[HashSet[Work]]) = new Filter with
CancellableFilter {
val timeToWait = 47
val timeToExecutor = 21000000L
val agent = agente
def response = true //random.nextBoolean
}
}
Basically, I think Worker code is ugly and I want to make it better. Could
you help to improve it?
Other point I want to improve is the cancel message. As I don't know which
of the filters are done, I need to Cancel all of them, then, at least one
cancel is redundant (Since this work is completed)