Cooking with Akka (Part 2)

by Javier Puerto
Tags: Open Source , Functional Programming , Scala , Akka

In the previous post Cooking with Akka, we modeled a bar's breakfast service with the help of the Akka toolkit. Now we want to introduce the concept of elasticity from the Reactive Manifesto. For this task we will continue with our analogy.

Shopping street Shopping street, licensed under CC0 1.0

Introducing delays

The previous implementation had no logic, everything happened instantly. This is not very realistic. To emulate a real request, we are going to introduce some delays in the process. When a ...

  • Waiter got a breakfast request. wait 1s before dispatching to the Chef. Dispatching time.
  • Chef got a breakfast request, wait 1s before dispatching to the Waiter. Processing time.
  • Waiter got a breakfast from the Chef, wait 1s before dispatching to the Customer. Dispatching time.
  • Customer got a breakfast, wait 5s before leaving the Bar. Consuming time.

We also include a quality of service by introducing the timeouts. The client will leave the bar unhappy in case that a breakfast can not be produced in 5s or less. When using futures, we can pass as argument the maximum waiting time (timeout) or use an implicit value.

implicit val timeout = Timeout(5 seconds)

System overview

Customer Bar Waiter Chef
TableRequest ->
Ok(Waiter)
Breakfast -----> --------------->
<- ChefRequest
Ok(Chef) >
Breakfast --->
<----------------- <-- Ingredients
Ok ----------- ----------------->
<----- Breakfast
<--------------- ----- Breakfast
Leaving --------->

The table columns represent the different actors and the arrows the communication between them.

How is our current situation?

The main app will create a street with a bar. An independent thread will be fired every second to introduce a new customer to the bar in a window of 30 seconds. After that, we will stop the Akka system and print a summary of the bar.

If we run the application we will get the following information:

....
[INFO] [11/22/2016 11:27:00.205] [BreakfastDemo-akka.actor.default-dispatcher-4] [akka://BreakfastDemo/user/streetActor/bar0/$a] The chef got a request.
[INFO] [11/22/2016 11:27:00.205] [BreakfastDemo-akka.actor.default-dispatcher-4] [akka://BreakfastDemo/user/streetActor/bar0] The Bar request the ingredients.
[ERROR] [11/22/2016 11:27:00.205] [ForkJoinPool-2-worker-21] [akka://BreakfastDemo/user/streetActor/bar0/$a] Timeout requesting the breakfast ingredients. WARNING arguments left: 1
[ERROR] [11/22/2016 11:27:00.205] [ForkJoinPool-2-worker-11] [akka://BreakfastDemo/user/streetActor/bar0/$b] Error dispatching the breakfast. WARNING arguments left: 1
[ERROR] [11/22/2016 11:27:00.206] [ForkJoinPool-2-worker-21] [akka://BreakfastDemo/user/streetActor/customer29] The breakfast can not be served. Error = java.lang.IllegalStateException: No ingredients available.
[INFO] [11/22/2016 11:27:00.206] [BreakfastDemo-akka.actor.default-dispatcher-8] [akka://BreakfastDemo/user/streetActor/bar0] The customer 'Sarah' is unhappy.
[INFO] [11/22/2016 11:27:09.283] [BreakfastDemo-akka.actor.default-dispatcher-8] [akka://BreakfastDemo/user/streetActor/bar0] We close the bar with 15 happy customers and 14 unhappy.

There are not enough ingredients in stock for all our customers. Effectively, we have ingredients to produce only 15 breakfasts. The bar learned about this and increased the stock to produce 50 breakfasts. If we run the program again, the situation changes:

[INFO] [11/22/2016 12:01:43.295] [BreakfastDemo-akka.actor.default-dispatcher-9] [akka://BreakfastDemo/user/streetActor/bar0] We close the bar with 30 happy customers and 0 unhappy.

The bar is happy too, it was able to serve 30 breakfasts for all the customers that we got in a day. Good for them!

What happens if we get more customers?

The bar is becoming very popular as the service is quite good, someday we got twice the number of usual customers. What will happen with the service?

[INFO] [11/22/2016 12:11:49.194] [BreakfastDemo-akka.actor.default-dispatcher-8] [akka://BreakfastDemo/user/streetActor/bar0] We close the bar with 2 happy customers and 57 unhappy.

That is not good for the Bar owner, so the Bar decides to incorporate a new Waiter. Luckily, the Bar knows how Akka works and it is very easy to do it.

Routers

Routing is an out-of-the-box solution that Akka provides to help us. We could create another WaiterActor reference and implement a logic in the BarActor when a WaiterActor is provided to the CustomerActor, but it is easier and more reliable to route the messages to a pool of WaiterActors. A router implements a dispatch logic to deliver a message to one actor out of a group of actors. This can be implemented as a router or like a router actor. The router implementation looks like this:

var router[Router] = {}

  def receive = {
    case w: Work =>
      router.route(w, sender())

In this case, an intermediate actor has to act as router by forwarding the messages to the routed actors. On the other hand, the router actor implementation could be like:

val waiter: ActorRef = context.actorOf(RoundRobinPool(2).props(WaiterActor.props))

Here we do not have to change our implementation, we reply with the router actor reference. Any message dispatched to the router actor will be forwarded, even keeping the original sender() reference, to the actor selected by the implemented logic. In the previous example we used RoundRobinPool, where the messages are forwarded to the different actors according to the round robin algorithm. Akka provides enough implementations but you can also implement your own logic.

Implementing a router actor for waiters

The WaiterActor has to be updated too. Before we ask for the ChefActor by asking the current parent, now the parent is the router! The router has been created by the BarActor so we only have to go up a level and ask for the grandfather actor. We can get it easily with the path ../.. and the actorSelection method that searches the references relative to the current actor.

context.actorSelection("../..") ? ChefRequest

Did it work?

Yes, the result of running the router modifications is:

[INFO] [11/22/2016 13:16:49.325] [BreakfastDemo-akka.actor.default-dispatcher-2] [akka://BreakfastDemo/user/streetActor/bar0] We close the bar with 47 happy customers and 0 unhappy.

But we have 47 customers instead of 60 that should be the number of customers who want to have breakfast. We do not have unhappy customers now, but we miss some potential clients who were not able to take a seat.

Creating franchises

Now we know that with our current implementation we are able to serve 47 customers every 30 seconds, the owner wants to keep the model and spread it, franchises could be the best solution. The owner then implements a pool of BarActor that will help maximize the benefits.

Custom actor router

For the task we could use for example a RoundRobinPool implementation provided by Akka, but for the demo we want to show you how to create a custom actor pool. The bar owner wants that customers are distributed sequentially to the different franchises, so the first request will be served by the first BarActor, the second will serve the next one and so on.

First of all, we create the dispatch logic.

import java.util.concurrent.atomic.AtomicInteger

final class SequentialRoutingLogic extends RoutingLogic {

  var current: AtomicInteger = new AtomicInteger(0)

  override def select(message: Any, routees: IndexedSeq[Routee]): Routee =
    if (routees.isEmpty) NoRoutee
    else routees(getNext(routees))

  private def getNext(routees: IndexedSeq[Any]): Int = {
    val actual = current.addAndGet(1)
    actual % routees.size
  }
}

For the task we only have to extend the RoutingLogic trait and implement the method select(msg: Any, routees: IndexedSeq[Routee]). For our use case we will use a concurrent AtomicInteger that guarantees that it will be modified atomically by all threads. Then we can determine the current route by performing the modulus between the current iteration and the pool size.

The actor router can now be defined as follows:

@SerialVersionUID(1L)
final case class SequentialPool(override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
  extends Pool
{
  override def createRouter(system: ActorSystem): Router = Router(new SequentialRoutingLogic)
}

That is all, we only have to extend the Pool trait and configure our routing logic. For our use case this is sufficient. Then we can use our SequentialPool actor like any other actor.

Implementation

We have to be careful with our current implementation as the customers selects a bar at the beginning and then uses this reference to leave. As the bars are now a router actor, we need to know the bar that attended our request from the pool or the leave message could end up in the wrong actor. For the task, when the customer asks a BarActor for a place and a WaiterActor is assigned to him, we should send as argument a tuple instead with the BarActor reference.

BarActor

    case CustomerActor.TableRequest(customer) =>
      log.info("We got a table request.")
      if (places.size < maximum) {
        places += customer
        log.info("Places available: {}", maximum - places.size )
        sender() ! (barReferences.waiterActor, self) // Send itself as argument.

Then we can use this reference in the CustomerActor to interact with the assigned BarActor.

            val future = ref ? TableRequest(self)
            future onComplete {
              case Success((waiterActor: ActorRef, barActor: ActorRef)) => {
                context.parent ! LeaveQueue
                this.barActor = barActor
                requestBreakfast(waiterActor)
              }
              ...
            }

Did it work?

Yes, it is working as expected. If we run the same test again we will get the following result.

[INFO] [11/23/2016 12:26:30.862] [BreakfastDemo-akka.actor.default-dispatcher-15] [akka://BreakfastDemo/user/streetActor/bars/$b] We close the bar with 30 happy customers and 0 unhappy.
[INFO] [11/23/2016 12:26:30.862] [BreakfastDemo-akka.actor.default-dispatcher-9] [akka://BreakfastDemo/user/streetActor/bars/$a] We close the bar with 30 happy customers and 0 unhappy.

We were able to serve the 60 customers without problems.

Conclusions

Modeling applications according to the Reactive Manifesto is simpler with Akka. Think about what is the scenario without a reactive application, the Bar should be unable to attend more requests until the current request is served, increasing the queue number and consuming resources. With Akka we achieve that even under a high load, our system will be able to serve more requests and distribute through the different BarActor, the high cost operations are running in the background thanks to the scala Futures, and the results will be available as soon as they are produced, split in smaller tasks.

The application is also easily scalable vertically by just increasing the number of WaiterActor and ChefActor in the pool or horizontally by increasing the number of BarActor in the StreetActor pool. Akka indeed provides a mechanism to dynamically adjust a pool size based on your requirements, see the documentation. We could resize the pool based on the current workload, this will help us to save memory and resources because the number of instances will be self-regulated just to attend the current load. It is the task of the reader to implement these variable pools and test with different loads, please add a comment in that case!

I hope that this blog post helps you to better understand the Akka toolkit and even the Reactive Manifesto concepts. You can review the implementation at GitHub. Thanks for reading!