Digital Garden
Computer Science
Distributed Systems
Actor Model

Actor Model

The actor model is not just good for concurrent programming but also for distributed systems as the actors can be on different systems and still communicate with each other.

Java Akka

We will be using the Akka framework again but instead of using the Scala version of Akka we will be using the java version which is very similar.

Creating an Actor

In Java, actors extend the AbstractClass and must implement the Receive createReceiver() method. The following actor discards all received messages because no matching is done.

public class PrintActor extends AbstractActor {
    @Override
    public Receive createReceiver() {
        return receiveBuilder().build();
    }
}

To react to messages we can use pattern matching like the example below:

public class PrintActor extends AbstractActor {
    private int cnt = 0;
    @Override
    public Receive createReceive() {
        return receiveBuilder().matchAny(t -> onReceive(t)).build();
    }
    private void onReceive(Object msg) {
        cnt++;
        if (msg instanceof String) {
            System.err.println(cnt + ": received message " + msg);
        } else {
            System.err.println(cnt + ": received unknown message");
        }
    }
}

The actual actor objects are then created and started asychnronsly by using the actor system. Just like in Scala when trying to create and actor object using new an ActorInitializationException is thrown.

public static void main(String[] args) throws Exception {
        ActorSystem as = ActorSystem.create();
        ActorRef actor = as.actorOf(
            Props.create(PrintActor.class),
            "Printer" // name is optional and must be unique
        ); // returns an immutable reference
        // ActorRef print = as.actorOf(Props.create(PrintActor.class, "Msg:")); For non default constructor actors
}

Sending Messages

In the scala version of Akka messages could be sent using the tell operator !. However, this is not possible in Java for syntax reasons so instead, messages can be sent to an actor by calling a member method on the receiving actor. Just like in Scala the message is guaranteed to be delivered at most once.

receivingActor.tell(msg, ActorRef.noSender()) // ActorRef.noSender() is same as null

Receiving Messages

In Java there are the following methods that can be used for pattern matching:

  • matchAny(UnitApply<Object> apply) Matches any argument. – match(Class<P> type, UnitApply<P> apply) Matches an argument of a particular type. – match(Class<P> type, TypedPredicate<P> p, UnitApply<P> app) Matches an argument of a particular type that matches a given predicate.

For example:

public Receive createReceive() {
    return receiveBuilder()
        .match(String.class,
            s -> s.startsWith("MSG:"),
            msg -> System.err.println(cnt++ + ": received message " + msg
        ));
}

The above functions also allow you to then do pattern matching very similarly to pattern matching in scala using case classes (in Java records).

public Receive createReceive() {
    return receiveBuilder()
        .match(LoginMessage.class, msg -> {
            String username = msg.username();
            sessions.put(username, getSender());
            broadcastMessage(username, "I just logged in");
        })
        .match(TextMessage.class, msg -> {
            broadcastMessage(msg.username(), msg.message());
        })
        .match(LogoutMessage.class, msg -> {
            String username = msg.username();
            sessions.remove(username);
            broadcastMessage(username, "I just logged out");
            getSender().tell(msg, getSelf());
        })
        .matchAny(msg -> unhandled(msg))
        .build();
}

Inside the Actor there are also the following methods that can be used just like in scala:

  • getSelf() Returns the actor reference to itself.
  • getSender() Returns the actor reference to the sender of the currently processed message.
  • getContext() Returns this actors context, may be used to create child actors.
  • forward(Object message, ActorContext context) Forwards the message and passes the original sender actor as the sender. Same as a.tell(msg, getSender()).

Distributed Actors

To use Akka in a distributed system actors need to be configured. Configurations can include log levels, message serializers, protocol specifics etc.

You can either define one big config file for all actors with the followin strucutre:

application.conf

And then use them like the following:

public static void main(String[] args) {
    Config config = ConfigFactory.load().getConfig("PrintConfig"); // without getConfig just gets base.
    System.out.println(c.getInt("akka.remote.artery.canonical.port")); // 25520
    ActorSystem sys = ActorSystem.create("PrintApplication", config);
    sys.actorOf(Props.create(PrintActor.class), "PrintServer");
    System.out.println("Started Print Application");
}

Or save each config in a separate file like foo.conf and then read the config with ConfigFactory.load("foo").

If you for some reason don't have an ActorRef to a distrubted actor or want to make use of wildcards you can do this with the actorSelection() function:

ActorSelection actor = as.actorSelection("akka://PrintApplication@127.0.0.1:25520/user/PrintServer"); // PrintApplication is the name of the system. user is always there for some reason.
ActorSelection actor = as.actorSelection("akka://PrintApplication@127.0.0.1:25520/user/*/PrintServer"); // all printServer no matter the parent Actor

Ask Pattern

In the Java version of Akka you can also use the ask pattern. This can be used when you expect an answer to a sent message. Instead of using the ask ? operator, there is the ask(ActorRef ref, Object msg, long timeout) method defined in the akka.pattern.Patterns package as a static method which returns an Akka Future not to mixed up with the normal Java Future. The Future is either a Success Object containing the response message or a Failure containing an AskTimeoutException.

Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
Future<Object> res = Patterns.ask(actor, message, timeout);
return (String) Await.result(res, timeout.duration()); // blocking