Distributed Algorithms
Distributed Euclidean Algorithm
The euclidean algorithm is used to find the GCD of two numbers. This can be done locally but also using a distributed algorithm:
int gcd(int a, int b) {
assert a >= 0 && b >= 0;
while(b != 0) {
int tmp = b;
b = a % b;
a = tmp;
}
return a;
}
In the distributed algorithm each node has a reference to its left and right neighbor. The node first informs its neighbors of its own value. If a received number is smaller than its value then a node adjusts its value and shares its new value with its neighbors.
public class GcdActor extends AbstractActor {
private int n;
private final Set<ActorRef> neighbours = new HashSet<>();
public GcdActor(int n) {
this.n = n;
System.out.printf("%s Initial Value: %d%n", getSelf(), n);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ActorRef.class, actor -> {
neighbours.add(actor);
if(neighbours.size() == 2) {
neighbours.forEach(a -> a.tell(n, getSelf()));
}})
.match(Integer.class, value -> {
if(value < n) {
n = ((n-1) % value) + 1;
neighbours.forEach(a -> a.tell(n, getSelf()));
System.out.printf("%s Current Value: %d%n",
getSelf(), n);
}})
.matchAny(msg -> unhandled(msg))
.build();
}
}
public static void main(String[] args) throws Exception {
ActorSystem as = ActorSystem.create();
List<Integer> values = List.of(108, 76, 12, 60, 36);
List<ActorRef> actors = IntStream.range(0, values.size())
.mapToObj(n -> as.actorOf(Props.create(GcdActor.class, values.get(n)), "GCD"+n))
.collect(Collectors.toList());
final int size = actors.size();
for(int i = 0; i < size; i++) {
actors.get(i).tell(actors.get((i+1) % size), null);
actors.get(i).tell(actors.get((size+i-1) % size), null);
}
}
Distributed Echo Algorithm
The idea of the echo algorithm is to traverse an arbitrary graph by implicitly building a spanning tree. The algorithm is defined as followed:
There are two types of messages: Explorer messages, which color the nodes red, and Echo messages, which color the nodes green. Before the algorithm is executed, all nodes are white.
- An initiator turns red and sends an explorer to all of his neighbors.
- A white node that receives an explorer turns red
- A node that has received an explorer or an echo over all of its edges turns green
- A non-initiator node that has received an explorer or an echo over all of its edges sends an echo over the edge over which it received the first explorer
- The algorithm terminates when the initiator turns green
The edges over which the echo messages have run result in a spanning tree. For a Graph with edges this algorithm uses 2 * E messages.
public class EchoNode extends AbstractActor {
private final Set < ActorRef > neighbours = new HashSet < > ();
private ActorRef parent;
private int counter = 0; // number of received tokens
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ActorRef.class, actor -> neighbours.add(actor))
.match(Start.class, value -> {
parent = getSender(); // initiator
neighbours.forEach(a -> a.tell(new Token(), getSelf()));
})
.match(Token.class, msg -> {
counter++;
if (parent == null) { // variant: if(counter == 1)
parent = getSender();
System.out.printf("Actor %s got informed by %s%n",
getSelf(), getSender());
neighbours.stream()
.filter(a -> a != parent)
.forEach(a -> a.tell(msg, getSelf()));
}
if (counter == neighbours.size()) {
parent.tell(msg, getSelf());
}
})
.matchAny(msg -> unhandled(msg))
.build();
}
public static void main(String[] args) throws Exception {
ActorSystem as = ActorSystem.create();
List < ActorRef > actors = IntStream.range(0, 8)
.mapToObj(n -> as.actorOf(Props.create(EchoNode.class), "Node" + n))
.collect(Collectors.toList());
addEdge(actors, 0, 1);
...
addEdge(actors, 7, 5);
Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
Future < Object > f = Patterns.ask(actors.get(0), new Start(), timeout);
Object result = Await.result(f, timeout.duration());
System.out.println(result);
as.terminate();
}
}
Distributed Election Algorithm
The idea of the election algorithm is to elect a leader among equal nodes for example to coordiante concurrency etc. For the alogrithm to work we need to assume that each node has a unique identifier that can be ordered. The node with the highest order is then the leader. At the end each node should know who the leader is.
Initially the value in each node is negative infinity. Every node can start the election as long as it is not yet involved in an election, i.e. value is neg inf. Upon start, a node stores its id number in the value field and sends this value to the next node. If a message arrives, its value is compared with the stored one. If it is greater than the stored value, the value is updated and the message is forwarded. If it is smaller, then the message is discarded. A node is leader if it receives its own message. The leader then may inform the other nodes about the election / termination of the algorithm.
public class ElectionNode extends AbstractActor {
private ActorRef next; // ring references
private ActorRef initiator; // initiator of the election
private final int id; // id of this actor
private int master = Integer.MIN_VALUE; // id of elected node
public ElectionNode(int id) {
this.id = id;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ActorRef.class, actor -> next = actor)
.match(Start.class, value -> {
if (master == Integer.MIN_VALUE) {
initiator = getSender();
master = id;
next.tell(new Token(master), getSelf());
}
})
.match(Token.class, token -> {
if (token.value > master) {
master = token.value;
next.tell(token, getSelf());
} else if (token.value == id) {
System.out.println("hurray, I got elected " + getSelf());
next.tell(new Reset(id), getSelf());
}
})
.match(Reset.class, token -> {
master = Integer.MIN_VALUE;
if (token.value == id) {
initiator.tell("" + id, getSelf());
} else {
next.tell(token, getSelf());
}
})
.matchAny(msg -> unhandled(msg))
.build();
}
}
Distributed Hash Tables
Distributed system that provides a lookup service like a hash table. Key Value pairs are stored in the nodes of a DHT Any participating node can efficiently retrieve the value associated with a given key. The tricky part is figuring out which node is responsible for which key? How do we handle changes to the network topology? Nodes can join or leave the network at any time.
Consistent Hashing
The goal of consistent hashing is to minimize the number of remaps if nodes are added or removed i.e. if the table is resized we only want keys need to be remapped on average with = number of keys and = number of nodes. Keys and Nodes are mapped to the same ID space (Integers) Nodes: hash(IP), Keys: hash(key). Hash Functions:
- SHA 1 => 160bit possible nodes
- Java => 32bit possible nodes
Each object (keys and nodes) is mapped to a point on a circle for example: if we use 6bit objects then we have the ID space: 0 .. = 63). Each key is stored at its successor, i.e. in the node with the next higher or equal ID. This has the following advantages:
- All nodes store roughly the same number of keys if the hash function is uniform.
- If a node joins or leaves, only a fraction of the keys need to be moved to a different node, i.e. only the successor of a node is involved.
This technique can be implemented in different ways. Either we have a complete graph so each node knows the location of every other node which leads to a lookup complexity of but storage of the routing table takes up with being the number of nodes.
record Put(Object key, Object value) {} // used to initate a put
record Put2(Object key, Object value) {} // used to store at dest node
record Get(Object key) {} // used to initate a get
record Get2(Object key) {} // used to get at dest node
record Result(int id, Object value) {} // used to return result
record AddNode(int id, ActorRef actor) {} // initiates an add node
record Partition(int id) {} // used to partition a node
record PartitionAnswer(Map < Object, Object > map) {}
// answer to a partition request
record Print() {} // debugging, i.e. print node info on console
public class HashNode extends AbstractActor {
private final int id; // id of this node
// references to all actors
private final TreeMap < Integer, ActorRef > actors = new TreeMap < > ();
// data stored in this node
private final Map < Object, Object > values = new HashMap < > ();
public HashNode(int id) {
this.id = id;
}
public Receive createReceive() {
return receiveBuilder()
.match(Map.class, actors -> {
this.actors.putAll(actors);
})
.match(Get.class, msg -> {
var keys = actors.navigableKeySet();
var key = keys.ceiling(msg.key().hashCode());
// ceiling returns the least element in this set
// greater than or equal to the given element,
// or null if there is no such element.
if (key == null) key = keys.first();
actors.get(key).tell(new Get2(msg.key()), getSender());
})
.match(Get2.class, msg -> {
getSender().tell(new Result(id, values.get(msg.key())),
getSelf());
})
.matchAny(msg -> {
unhandled(msg);
})
.build();
}
}
Or we can have a cyclic graph so each node only knows the location of its successor, this leads to a lookup complexity of but storage only uses .
record Put(Object key, Object value) {} // used to initate a put
record Put2(Object key, Object value, int previousId) {}
// used to distrbibute put in the ring
record Get(Object key) {} // used to initate a get
record Get2(Object key, int previousId, int counter) {}
// used to distribute get in the ring
record Result(int id, Object value, int counter) {}
record SetNext(int nextId, ActorRef next) {}
record AddNode(int newId, ActorRef newActor) {}
record Partition(int id) {} // used to partition a node, i.e. return
// all elements <= id
record PartitionAnswer(Map < Object, Object > map) {}
record Print(ActorRef start) {} // print node info on console
public class HashNode extends AbstractActor {
private final int id; // id of this node
private ActorRef next; // next node in the ring
private int nextId; // id of next node in ring
private Map < Object, Object > values = new HashMap < > (); // data
public HashNode(int id) {
this.id = id;
}
public Receive createReceive() {
return receiveBuilder()
.match(SetNext.class, msg -> {
next = msg.next();nextId = msg.nextId();
})
.match(Put.class, msg -> {
next.tell(new Put2(msg.key(), msg.value(), this.id), null);
})
.match(Get.class, msg -> {
next.tell(new Get2(msg.key(), this.id, 1), getSender());
})
.match(Put2.class, msg -> {
int hash = msg.key().hashCode();
if (between(hash, msg.previousId(), this.id)) {
values.put(msg.key(), msg.value());
} else {
next.tell(new Put2(msg.key(), msg.value(), this.id), null);
}
})
.match(Get2.class, msg -> {
int hash = msg.key().hashCode();
if (between(hash, msg.previousId(), this.id)) {
getSender().tell(new Result(id, values.get(msg.key()),
msg.counter()), getSelf());
} else {
next.tell(new Get2(msg.key(), id, msg.counter() + 1),
getSender());
}
})
}
}
Chord Algorithm
The chord algorithm and protocol implements a distributed hash table with a lookup time of log(N) and is based on Consistent Hashing. It uses so called finger tables. In these tables every node knows up to other nodes, and the distance of the nodes it knows increases exponentially (m is the bit length of the hash function). Meaning the The i-th entry (0..m-1) in the table of node n contains a reference to the successor the first entry of the finger table is the immediate successor. Example: 16 node Chord network (m = 4).
Lookup
The finger table is used to find the predecessor of the node which stores a given key.
- Node 10 is asked to look up key 5 => Finger table refers to node 43.
- Node 43 is asked to look up key 5 => Finger table refers to node 1
- Node 1 is asked to look up 5 => Key is between 2 and 10 (1 < 5 <= 10), so Node 10 contains the searched key and its associated value
n.find_successor(id)
if id in (n, successor] then // n < id && id <= successor
return successor // this is the node which contains key id
else
// forward the query around the circle
n0 = closest_preceding_node(id)
return n0.find_successor(id)
// search the local table for the highest predecessor of id
n.closest_preceding_node(id)
for (int i = m - 1; i >= 0; i--)
do
if (finger[i] in (n, id)) then
return finger[i]
return n
Join
If a new node joins, the following invariants must be maintained:
- Each node refers to its immediate successor => ensures correctness
- Each ( key,value ) pair is stored in successor(hash(key)) => ensures correctness
- The finger table of each node should be correct => keeps query operation fast
record Put(Object key, Object value) {} // used to distribute in ring
record Put2(Object key, Object value) {} // put in destination node
record Get(Object key, int counter) {} // used to distribute in ring
record Get2(Object key, int counter) {} // get in destination node
record Result(int id, Object value, int counter) {}
record Partition(int id) {} // used to partition a node, i.e. return
// all elements <= id
record PartitionAnswer(Map < Object, Object > map) {}
record Print() {} // debugging, i.e. print node info on console
public class HashNode extends AbstractActor {
private final int id; // id of this node
private int next; // id of next node
private TreeMap < Integer, ActorRef > fingerTable;
private Map < Object, Object > values = new HashMap < > ();
public HashNode(int id) {
this.id = id;
}
public Receive createReceive() {
return receiveBuilder()
.match(TreeMap.class, fingerTable -> {
this.fingerTable = fingerTable;
})
.match(Integer.class, next -> this.next = next)
.match(Get.class, msg -> {
int hash = msg.key().hashCode();
if (between(hash, id, next)) {
fingerTable.get(next).tell(
new Get2(msg.key(), msg.counter() + 1), getSender());
} else {
var set = fingerTable.navigableKeySet();
var prev = set.lower(hash);
if (prev == null) prev = set.last();
fingerTable.get(prev).tell(
new Get(msg.key(), msg.counter() + 1), getSender());
}
})
.match(Get2.class, msg -> {
getSender().tell(new Result(this.id,
values.get(msg.key()), msg.counter()), getSelf());
})
}
}