STM - Software Transactional Memory
Up till now, we have always been writing about how we want something to work atomically and with a lot of bloat code and knowledge of what goes on in the background. For example, just a simple thread-safe transfer method can quickly become very complicated:
public void transfer(Account from, Account to, double amount) throws InactiveException, OverdrawException {
Account x, y;
// lexicographically order locks
if (from.getNumber().compareTo(to.getNumber()) < 0) {
x = from; y = to;
} else {
x = to; y = from;
}
synchronized (x) {
synchronized (y) {
from.withdraw(amount);
try {
to.deposit(amount);
} catch (InactiveException e) {
from.deposit(amount); // if failed load money back
throw e;
}
}
}
}
Instead, we would much rather just be able to say something like the following:
def transfer(from: Account, to: Account, amount: Double): Unit = {
atomic { implicit tx =>
from.withdraw(amount)
to.deposit(amount)
}
}
And we can do something very similar to this with the software transactional memory (STM) system in scala. The STM is a coordination mechanism for shared memory and can therefore coordinate access to heap locations in a concurrent environment.
The STM is heavily inspired by transactions for databases where you have the ACID principle (atomic, consistent, isolated, durable). In other words, a transaction is a sequence of reading and writing operations to shared memory that occur logically (consistent) at a single instant in time (atomic) and where the intermediate state is not visible to other transactions (isolated).
Just like for databases at the end of a transaction the state is checked for any conflicts. If there is a conflict the transaction is aborted and retried, if there isn't then the changes are made permanent and visible to the other transactions.
So it is very similar to working with CAS:
@volatile
private bal: Int = 0;
def deposit(amount: Int): Unit = {
while (true) {
val oldBal = bal; // read current value
val newBal = oldBal + amount; // compute new value
if (compareAndSet(addr(bal), oldBal, newBal)) {
return; // commit successful -> return
}
// conflict -> retry
}
}
ScalaSTM
To use ScalaSTM you need to add the dependency to your project and then import it with import scala.concurrent.stm._
Ref and Atomic
ScalaSTM offers the Ref
class which can be used as a wrapper (mutable cell) for a reference. The access to this reference is coordinated by the STM system. The reference held by the wrapper should be immutable otherwise the reference can be changed via the reference and the STM system can not coordinate these changes.
val ref: Ref[Int] = Ref(1)
val refView: Ref.View[Int] = ref.single
Single-operation memory transactions may be performed without an explicit atomic block using the Ref.View
returned from ref.single
. Otherwise, Ref is only allowed to be changed inside the static scope of an atomic block. Reads and writes of a Ref are performed by using x.get
and x.set(newValue)
, or more concisely by x()
and x() = newValue
.
object CheatSheet extends App {
val x = Ref(10) // allocate a Ref[Int]
val y = Ref.make[String]() // type-specific default, holds no reference
val z = x.single // Ref.View[Int]
// can perform single operations on Ref.View objects
z.set(11) // will act as if in atomic block
println(z())
val success = z.compareAndSet(11, 12)
val old = z.swap(13) // old: Int
println(old)
// println(x()) can only be done in atomic block
atomic { implicit txn =>
val i = x() // read
y() = "x was " + i // write
z() = 10
val eq = atomic { implicit txn => // nested atomic
x() == z() // both Ref and Ref.View can be used inside atomic
}
assert(eq)
y.set(y.get + ", long-form access")
}
// atomic transformation
z.transform {
_ max 20
}
val pre = y.single.getAndTransform {
_.toUpperCase
}
val post = y.single.transformAndGet {
_.filterNot {
_ == ' '
}
}
}
The atomic function is defined as def atomic[Z](block: InTxn => Z): Z
and takes a parameter of type InTxn which provides a context for the transaction to be executed. The context has to be marked implicit as it is automatically pulled in. Luckily the atomic function is composable so we the code arrives at an atomic block it checks if it can join an existing tx or it creates a new so we can then do something like this:
class STMAccount(val id: Int) {
private val balance = Ref(0d)
def withdraw(a: Double) {
atomic { implicit txn =>
balance() = balance() – a
}
}
def deposit(a: Double) {
atomic { implicit txn =>
balance() = balance() + a
}
}
}
class STMBank {
def transfer(amount: Double, from: STMAccount, to: STMAccount) {
atomic { implicit txn =>
to.deposit(amount)
from.withdraw(amount)
}
}
}
Exceptions in Atomic
If an exception occurs inside an atomic block it can be caught and handled inside the atomic block. But if it is not caught then the transaction is rolled back and the exception is thrown higher up.
val last = Ref("none")
atomic { implicit txn =>
last() = "outer"
try {
atomic { implicit txn =>
last() = "inner"
throw new RuntimeException
}
} catch {
case _: RuntimeException =>
}
}
println(last.single.get) // outer because inner was rolled back
You do have to be aware of some things tho for example the following will only output the value 0. This is because transactions are compositional meaning the inner transactions only commit once the outer transaction has committed:
Object Main extends App {
val balance: Ref[Int] = Ref(0)
def pay(amount: Int) : Unit = atomic { implicit tx =>
Txn.afterCommit(_ => println("Transfer:" + amount))
balance += amount
if(balance() < 0){
throw new RuntimeException
}
}
val t1 = new Thread(() => { atomic { implicit tx =>
pay(2)
pay(-4)
}})
t1.start()
println(balance.single.get)
}
Lifecycle Callbacks
The STM also offers some callback functions for certain lifecycle states:
Txn.afterCommit(handler: Status => Unit)
Txn.afterRollback(handler: Status => Unit)
Txn.beforeCommit(handler: (InTxn) ⇒ Unit)(implicit txn: InTxn): Unit
Txn.rollback(cause: RollbackCause)(implicit txn: InTxnEnd): Nothing
Txn.retry(implicit txn: InTxn): Nothing
With the Status either being completed
when the transaction has been rolled back or committed or decided
.
def transfer(from: STMAccount, to: STMAccount, amount: Double) {
atomic { implicit txn =>
to.deposit(amount)
from.withdraw(amount)
Txn.afterCommit { _ => sendMail(to.email, "You've got $" + amount) }
}
}
Behind the Scenes
Behind the scenes, the STM system keeps a global version counter for the last successfully committed transaction. Additionally, each Ref is marked with a so-called local version stamp which is the version of the last successfully committed transaction which modified the reference. When a new transaction is started the following is done:
- Transaction start: The new transaction stores the value of the global version counter locally, this is the so-called read version.
- Transaction body: Before a Ref is modified and read from, a local working copy is made of it and only this local copy is read from and written to. For every access of the Ref the local version stamp of the Ref is compared to the read version and if it is larger than the read version the transaction is aborted and retried.
- Transaction commit: All original Refs that were modified are locked (with a timeout to avoid deadlocks). Then the global version counter is incremented and copied locally for the transaction, this is the so-called write version. All Refs are then checked again and the transaction is aborted and retried if the version stamp > read version and the object is locked. Then finally the values and the write version are written to the original Refs and the locks are released.