Concurrent Programming in Scala
Advantages of Scala for Concurrency
Scala has many advantages for why it is popular for concurrent programs, however, the main reason is that it is heavily influenced by functional languages and has a large focus on immutability which eases concurrent programming heavily. Because Scala is built up on the JVM it already supports thread-based concurrency and the executor framework. But it also builds up on that with the Software Transactional Memory (STM) system which is inspired by Clojure and Haskell. It also offers the Akka library which supports type-safe Actor-based concurrency just like in Erlang.
Scala Option
An object of Option
represents an optional value, either it is present or it isn't. Instances of Option are either an instance of Some
or the object None
.
val o1: Option[Int] = None // same as Option(null)
val o2 = Some(10)
println(o1.isDefined)
println(o1.isEmpty)
println(o1.getOrElse("Empty"))
println(o2.get)
An Option is optimal for pattern matching:
val result = List(1,2,3).find(i => i >= 2) match {
case None => "Not Found!"
case Some(elem) => "Found: " + elem
}
Scala Collections
Scala has mutable and immutable collections each in their corresponding sub-package scala.collection.immutable
and scala.collection.mutable
. A mutable collection can be updated or extended in place. This means when you change, add, or remove elements of a mutable collection it is done in place as a side effect. However, immutable collections never change. Operations that change, add or remove elements return a new immutable collection and leave the old collection unchanged. In scala the default is to use the immutable, if you want the mutable version you need to explicitly import it.
Arrays
In scala, an array is a mutable sequence of objects that share the same type with a fixed length that is given when the array is instantiated.
val reserved = new Array[String](3) // calls constructor in Array class
val words = Array("zero", "one", "two") // calls apply() factory in companion object
for (i <- 0 to 2) // to is a method using operator notation returning a sequence, 0.to(2)
println(words(i)) // calls apply function / () operator in Array class
words(0) = "nothing" // shorthand for words.update(0, "nothing")
words.foreach(println) // shorthand for one argument with println(_)
Lists and Sets
In scala, List
is a concrete class, not an interface as in Java. Meaning we can create immutable or mutable List objects. The advantage of a List is that it can contain an arbitrary amount of elements because it is implemented as a linked list. But just like with arrays a List can only contain objects of the same type.
val list0 = List(1, 2, 3) // List(1,2,3)
val head = list0.head // 1, first element
val tail = list0.tail // List(2,3), the rest
val init = list0.init // List(1,2), all but last
val reversed = list0.reverse // List(3,2,1)
val list1 = 0 :: list0 // List(0,1,2,3), prepend
val list2 = Nil // List() or List.empty
val list3 = list0.map(i => i + 1) // List(2,3,4)
val list4 = list0.filter(i => i > 1) // List(2,3)
val sum0 = list0.reduce((x, y) => x + y) // 6
val sum1 = list0.sum // 6
val count = list0.count(i => i > 1) // 2
val list5 = list0.zip(List('A', 'B', 'C')) // List((1,A), (2,B), (1,C))
val list6 = list0.groupBy(i => i % 2 == 0) // Map(false -> List(1,1), true -> List(2))
val large = list0.find(i => i > 12) // None
val small = list0.find(i => i < 12) // Some(1)
val list7 = list0.drop(0) // List(2,3)
val list8 = list0.dropRight(2) // List(1), without 2 right most elements
val list9 = list0 ::: List(3,5,6) // List(1,2,3,4,5,6)
list0.foreach(i => print(i + " ")) // 1 2 3
println(list0.mkString(", ")) // 1, 2, 3
A Set is the same as a List but can only contain unique values.
val set0 = Set(1,2,3,2) // Set(1,2,3)
val set1 = set0 + 4 // Set(1,2,3,4)
val set2 = set0 - 1 // Set(2,3)
val contains = set1(0) // false, same as set1.contains(0)
val set3 = set1.filter(i => i > 2) // Set(3,4)
val set4 = set1.map(i => i > 2) // Set(false,true)
val subset = set2 subsetOf set0 // true
Tuples
Tuples are like lists but can contain different types of elements. They are commonly used for returning multiple values from a function.
val pair = (99, "Luftballons") // is inferred to the type Tuple2[Int, String]
val num = pair(0)
val what = pair(1)
Maps
val map0 = Map(1 -> "one", 2 -> "two") // Map(1 -> "one", 2 -> "two")
val map1 = map0 + (3 -> "three") // Map(1 -> "one", 2 -> "two", 3 -> "three")
val map2 = map1 - 1 // Map(2 -> "two", 3 -> "three")
val val1 = map0(1) // "one"
val val0 = map0(0) // j.u.NoSuchElementException: key not found: 0
val optVal0 = map0.get(0) // None
val optVal1 = map0.get(1) // Some(1)
val res = map1.filter(kv => kv._1 > 2) // Map(3 -> "three")
Parallel Collections
Parallel collections were included in the Scala standard library to enable parallel programming without users needing to know low-level details by providing a simple high-level abstraction. Some prime operations for parallelization are:
val list = (1 to 10000).toList
val res = list.map(i => i * 3)
val even = list.filter(i => i % 2 == 0)
val sum = list.reduce((i,j) => i + j)
val par_res = list.par.map(i => i*3).seq.toList
val par_even = list.par.filter(i => i % 2 == 0).seq.toList
val par_sum = list.par.reduce((i,j) => i + j)
There are some things you do need to be aware of when using parallel collections. For example, if the collection isn't very big then the setup for parallelizing the functions might be larger than the performance gain. The other thing to be aware of is non-deterministic functions such as non-associative operations.
(1 to 5).foreach(print) // 12345
(1 to 5).par.foreach(print) // depending on execution 34512
(1 to 1000).par.reduce(_ - _) // depending on execution -330101
Scala Futures
The problem with Javas Futures is that the get call is blocking.
public Future[String] loadHomePage() { ... }
public Map<String,Integer> indexContent(String content) { ... }
public void work() throws Exception {
// Block current Thread until result is available
String content = loadHomePage().get();
Map<String, Integer> index = indexContent(content);
System.out.println(index);
}
Instead, we would much rather make use of the observable pattern instead of having to wait for results. In Scala, by default, futures are non-blocking as they make use of callback functions instead of typical blocking operations. However, blocking is still possible but is heavily discouraged. Futures in Scala are defined as followed:
object Future {
def apply[T](task: => T)(implicit ec: ExecutionContext): Future[T]
}
// Usage:
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)
// or in short
import scala.concurrent.ExecutionContext.Implicits.global
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
}
Let’s assume we want to fetch a list of recent posts and display them. We can register a callback by using the onComplete[U](f: Try[A] => U]): Unit
method, where Try is very similar to Option
and can have the value of type Success[T]
if the future completes successfully, or a value of type Failure[T]
otherwise.
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
f.onComplete {
case Success(posts) => for (post <- posts) println(post)
case Failure(t) => println("An error has occurred: " + t.getMessage)
}
Registering a foreach
callback has the same semantics as onComplete, with the difference that the closure is only called if the future is completed successfully.
val f: Future[List[String]] = Future {
session.getRecentPosts
}
f.foreach { posts =>
for (post <- posts) println(post)
}
Given a future and a mapping function for the value of the future you can produce a new future that is completed with the mapped value once the original future is successfully completed with the map
combinator.
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase foreach { amount =>
println("Purchased " + amount + " USD")
}
But what happens if isProfitable returns false, hence causing an exception to be thrown? In that case purchase fails with that exception. Furthermore, imagine that the connection was broken and that getCurrentValue threw an exception, failing rateQuote. In that case we’d have no value to map, so the purchase would automatically be failed with the same exception as rateQuote.
In conclusion, if the original future is completed successfully then the returned future is completed with a mapped value from the original future. If the mapping function throws an exception the future is completed with that exception. If the original future fails with an exception then the returned future also contains the same exception.
The flatmap
does basically the same thing but also wraps it into a future:
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = usdQuote flatMap {
usd =>
chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf))
}
You can as mentioned however also use blocking calls on futures:
val result = Await.result(homepage, 1 second)
val result = Await.result(homepage, Duration.Inf)
Reactive Programming with RxScala
ReactiveX (Rx) is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events. It offers the following structures:
An Observable represents an observable sequence of events much like an Iterable:
trait Observable[T] {
def subscribe(obs: Observer[T]): Subscription
}
trait Observer[T] {
def onNext(t: T): Unit
def onCompleted(): Unit
def onError(t: Throwable): Unit
}
trait Subscription {
def unsubscribe(): Unit
def isUnsubscribed(): Boolean
}
// Observable[String] emitting some HTML strings
getDataFromNetwork()
.drop(7)
.filter(s => s.startsWith("h"))
.take(12)
.map(s => toJson(s))
.subscribe(j => println(j)) // instead of foreach