Digital Garden
Computer Science
Concurrent & Parallel Programming
Concurrent Programming in Scala

Concurrent Programming in Scala

Advantages of Scala for Concurrency

Scala has many advantages for why it is popular for concurrent programs, however, the main reason is that it is heavily influenced by functional languages and has a large focus on immutability which eases concurrent programming heavily. Because Scala is built up on the JVM it already supports thread-based concurrency and the executor framework. But it also builds up on that with the Software Transactional Memory (STM) system which is inspired by Clojure and Haskell. It also offers the Akka library which supports type-safe Actor-based concurrency just like in Erlang.

Scala Option

An object of Option represents an optional value, either it is present or it isn't. Instances of Option are either an instance of Some or the object None.

val o1: Option[Int] = None // same as Option(null)
val o2 = Some(10)
println(o1.isDefined)
println(o1.isEmpty)
println(o1.getOrElse("Empty"))
println(o2.get)

An Option is optimal for pattern matching:

val result = List(1,2,3).find(i => i >= 2) match {
    case None => "Not Found!"
    case Some(elem) => "Found: " + elem
}

Scala Collections

Scala has mutable and immutable collections each in their corresponding sub-package scala.collection.immutable and scala.collection.mutable. A mutable collection can be updated or extended in place. This means when you change, add, or remove elements of a mutable collection it is done in place as a side effect. However, immutable collections never change. Operations that change, add or remove elements return a new immutable collection and leave the old collection unchanged. In scala the default is to use the immutable, if you want the mutable version you need to explicitly import it.

Arrays

In scala, an array is a mutable sequence of objects that share the same type with a fixed length that is given when the array is instantiated.

val reserved = new Array[String](3) // calls constructor in Array class
val words = Array("zero", "one", "two") // calls apply() factory in companion object
 
for (i <- 0 to 2) // to is a method using operator notation returning a sequence, 0.to(2)
println(words(i)) // calls apply function / () operator in Array class
 
words(0) = "nothing" // shorthand for words.update(0, "nothing")
 
words.foreach(println) // shorthand for one argument with println(_)

Lists and Sets

In scala, List is a concrete class, not an interface as in Java. Meaning we can create immutable or mutable List objects. The advantage of a List is that it can contain an arbitrary amount of elements because it is implemented as a linked list. But just like with arrays a List can only contain objects of the same type.

val list0 = List(1, 2, 3) // List(1,2,3)
val head = list0.head // 1, first element
val tail = list0.tail // List(2,3), the rest
val init = list0.init // List(1,2), all but last
val reversed = list0.reverse // List(3,2,1)
val list1 = 0 :: list0 // List(0,1,2,3), prepend
val list2 = Nil // List() or List.empty
val list3 = list0.map(i => i + 1) // List(2,3,4)
val list4 = list0.filter(i => i > 1) // List(2,3)
val sum0 = list0.reduce((x, y) => x + y) // 6
val sum1 = list0.sum // 6
val count = list0.count(i => i > 1) // 2
val list5 = list0.zip(List('A', 'B', 'C')) // List((1,A), (2,B), (1,C))
val list6 = list0.groupBy(i => i % 2 == 0) // Map(false -> List(1,1), true -> List(2))
val large = list0.find(i => i > 12) // None
val small = list0.find(i => i < 12) // Some(1)
val list7 = list0.drop(0) // List(2,3)
val list8 = list0.dropRight(2) // List(1), without 2 right most elements
val list9 = list0 ::: List(3,5,6) // List(1,2,3,4,5,6)
list0.foreach(i => print(i + " ")) // 1 2 3
println(list0.mkString(", ")) // 1, 2, 3

A Set is the same as a List but can only contain unique values.

val set0 = Set(1,2,3,2) // Set(1,2,3)
val set1 = set0 + 4 // Set(1,2,3,4)
val set2 = set0 - 1 // Set(2,3)
val contains = set1(0) // false, same as set1.contains(0)
val set3 = set1.filter(i => i > 2) // Set(3,4)
val set4 = set1.map(i => i > 2) // Set(false,true)
val subset = set2 subsetOf set0 // true

Tuples

Tuples are like lists but can contain different types of elements. They are commonly used for returning multiple values from a function.

val pair = (99, "Luftballons") // is inferred to the type Tuple2[Int, String]
val num = pair(0)
val what = pair(1)

Maps

val map0 = Map(1 -> "one", 2 -> "two") // Map(1 -> "one", 2 -> "two")
val map1 = map0 + (3 -> "three") // Map(1 -> "one", 2 -> "two", 3 -> "three")
val map2 = map1 - 1 // Map(2 -> "two", 3 -> "three")
val val1 = map0(1) // "one"
val val0 = map0(0) // j.u.NoSuchElementException: key not found: 0
val optVal0 = map0.get(0) // None
val optVal1 = map0.get(1) // Some(1)
val res = map1.filter(kv => kv._1 > 2) // Map(3 -> "three")

Parallel Collections

Parallel collections were included in the Scala standard library to enable parallel programming without users needing to know low-level details by providing a simple high-level abstraction. Some prime operations for parallelization are:

val list = (1 to 10000).toList
val res = list.map(i => i * 3)
val even = list.filter(i => i % 2 == 0)
val sum = list.reduce((i,j) => i + j)
 
val par_res = list.par.map(i => i*3).seq.toList
val par_even = list.par.filter(i => i % 2 == 0).seq.toList
val par_sum = list.par.reduce((i,j) => i + j)

There are some things you do need to be aware of when using parallel collections. For example, if the collection isn't very big then the setup for parallelizing the functions might be larger than the performance gain. The other thing to be aware of is non-deterministic functions such as non-associative operations.

(1 to 5).foreach(print) // 12345
(1 to 5).par.foreach(print) // depending on execution 34512
(1 to 1000).par.reduce(_ - _) // depending on execution -330101

Scala Futures

The problem with Javas Futures is that the get call is blocking.

public Future[String] loadHomePage() { ... }
public Map<String,Integer> indexContent(String content) { ... }
public void work() throws Exception {
    // Block current Thread until result is available
    String content = loadHomePage().get();
    Map<String, Integer> index = indexContent(content);
    System.out.println(index);
}

Instead, we would much rather make use of the observable pattern instead of having to wait for results. In Scala, by default, futures are non-blocking as they make use of callback functions instead of typical blocking operations. However, blocking is still possible but is heavily discouraged. Futures in Scala are defined as followed:

object Future {
    def apply[T](task: => T)(implicit ec: ExecutionContext): Future[T]
}
// Usage:
val inverseFuture : Future[Matrix] = Future {
    fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)
 
// or in short
import scala.concurrent.ExecutionContext.Implicits.global
val inverseFuture : Future[Matrix] = Future {
    fatMatrix.inverse()
}

Let’s assume we want to fetch a list of recent posts and display them. We can register a callback by using the onComplete[U](f: Try[A] => U]): Unit method, where Try is very similar to Option and can have the value of type Success[T] if the future completes successfully, or a value of type Failure[T] otherwise.

val f: Future[List[String]] = Future {
    session.getRecentPosts()
}
 
f.onComplete {
    case Success(posts) => for (post <- posts) println(post)
    case Failure(t) => println("An error has occurred: " + t.getMessage)
}

Registering a foreach callback has the same semantics as onComplete, with the difference that the closure is only called if the future is completed successfully.

val f: Future[List[String]] = Future {
    session.getRecentPosts
}
 
f.foreach { posts =>
    for (post <- posts) println(post)
}

Given a future and a mapping function for the value of the future you can produce a new future that is completed with the mapped value once the original future is successfully completed with the map combinator.

val rateQuote = Future {
    connection.getCurrentValue(USD)
}
 
val purchase = rateQuote map { quote =>
    if (isProfitable(quote)) connection.buy(amount, quote)
    else throw new Exception("not profitable")
}
 
purchase foreach { amount =>
    println("Purchased " + amount + " USD")
}

But what happens if isProfitable returns false, hence causing an exception to be thrown? In that case purchase fails with that exception. Furthermore, imagine that the connection was broken and that getCurrentValue threw an exception, failing rateQuote. In that case we’d have no value to map, so the purchase would automatically be failed with the same exception as rateQuote.

In conclusion, if the original future is completed successfully then the returned future is completed with a mapped value from the original future. If the mapping function throws an exception the future is completed with that exception. If the original future fails with an exception then the returned future also contains the same exception.

The flatmap does basically the same thing but also wraps it into a future:

val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
 
val purchase = usdQuote flatMap {
    usd =>
        chfQuote
            .withFilter(chf => isProfitable(usd, chf))
            .map(chf => connection.buy(amount, chf))
}

You can as mentioned however also use blocking calls on futures:

val result = Await.result(homepage, 1 second)
val result = Await.result(homepage, Duration.Inf)

Reactive Programming with RxScala

ReactiveX (Rx) is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events. It offers the following structures:

An Observable represents an observable sequence of events much like an Iterable:

    trait Observable[T] {
    def subscribe(obs: Observer[T]): Subscription
}
trait Observer[T] {
    def onNext(t: T): Unit
    def onCompleted(): Unit
    def onError(t: Throwable): Unit
}
trait Subscription {
    def unsubscribe(): Unit
    def isUnsubscribed(): Boolean
}
 
// Observable[String] emitting some HTML strings
getDataFromNetwork()
    .drop(7)
    .filter(s => s.startsWith("h"))
    .take(12)
    .map(s => toJson(s))
    .subscribe(j => println(j)) // instead of foreach