Sunday, May 20, 2018

Getting started with Akka


1. Intro

In a world of concurrency and multi-threading, akka is very popular solution, because it provides lightweight actor system. In this post I'm  about to show a very simple example of application with akka actor system. 

So what is akka ? From wiki: 

Akka is a free and open-source toolkit and runtime simplifying the construction of concurrent and distributed applications on the JVM. Akka supports multiple programming models for concurrency, but it emphasizes actor-based concurrency, with inspiration drawn from Erlang.[citation needed]


The main idea is: parallel tasks should be executed by actors(workers). Each actor can receive  and send messages.  For example, actor can receive task with details about work to do. And after doing this work - can send results back to sender(or in more complicated flow - can delegate some pieces of work to another actors).




2. Application structure

In this application we will be counting words in a file. We will have 2 types of actors:
- FileWordCounterActor (jus one such actor) - responsible for counting words in the whole file
- LineWordCounterActor(one actor per line) - responsible for counting words in a line

Communication between main function and actors will be via messages:
 - we have to send a message to  FileWordCounterActor with the name of file, for which we want to count the words
 - later FileWordCounterActor should split the file into lines, create for every line LineWordCounterActor and send him a message with this line
 - LineWordCounterActor have to count the words in a line and send a result message with this count back to FileWordCounterActor
 - and finally FileWordCounterActor have to sum all received from LineWordCounterActors counts and return this sum back to main function





3. build.sbt

We need just one dependency for akka

name := "akka-demo"
version := "0.1"
scalaVersion := "2.11.8"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.12"



4. Main application class

In main application class we are creating actor system and from it creating our main actor: FileWordCounterActor. In akka library a lot of methods require implicit parameters - to avoid this implicit "magic" and make code more understandable I used caring. All we need to do in main method is:
- send a message to our main actor with file name
- from future object from message sending we can get the result and print it

package com.demien.akkademo

import akka.actor.{ActorSystem, Props}
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts.global
object AkkaApp extends App {

  val system = ActorSystem("System")
  val wcActor = system.actorOf(Props(new FileWordCounterActor(args(0))))
  val wcActorAskWithTimeout = wcActor.ask(_: Any)(Timeout(10 seconds))

  val askingFuture = wcActorAskWithTimeout(StartProcessFileMsg())
  val askingFutureGlobalContextMap = askingFuture.map(_: Any => Any)(global())

  askingFutureGlobalContextMap(result => {
    println("Total number of words " + result)
    system.terminate()
    result
  })


}


5. FileWordCounter actor

In this actor we are receiving message with file name, opening this file and for each line in this file we are creating the "child" actor: LineWordCounterActor and sending him the message with line from file. Also we are receiving from this child actor result of calculation and accumulating it. At the end, when every line was processed, we can return the result to our main activity.

package com.demien.akkademo

import akka.actor.{Actor, ActorRef, Props}

case class StartProcessFileMsg()

class FileWordCounterActor(filename: String) extends Actor {

  private var totalLines = 0  private var linesProcessed = 0  private var result = 0  private var fileSender: ActorRef = null
  def receive = {
    case StartProcessFileMsg() => {
      fileSender = sender
      import scala.io.Source._
      fromFile(filename).getLines.foreach { line =>
        val lineWordCounterActor = context.actorOf(Props[LineWordCounterActor])
        lineWordCounterActor.tell(CountWordsInLineMsg(line), self)
        totalLines += 1      }

    }
    case WordsInLineResultMsg(wordsCount) => {
      result += wordsCount
      linesProcessed += 1      if (linesProcessed == totalLines) {
        fileSender.tell(result, self)
      }
    }
    case _ => println("message not recognized!")
  }
}



6. LineWordCounter actor

Here we just have to count number of words in the line and send result to "parent" actor.

package com.demien.akkademo

import akka.actor.Actor

case class CountWordsInLineMsg(line: String)

case class WordsInLineResultMsg(words: Integer)

class LineWordCounterActor extends Actor {
  def receive = {
    case CountWordsInLineMsg(string) => {
      val wordsInLine = string.split(" ").length
      sender.tell(WordsInLineResultMsg(wordsInLine), self)
    }
    case _ => println("Error: message not recognized")
  }
}



7. Execution

For execution we have to pass command line parameter with the file name: 




And the execution result should be something like this:

Total number of words 1400

Process finished with exit code 0


8. The end

Full source code can be downloaded from here.

No comments:

Post a Comment