The festive season is that period of the year when they tempt you to indulge in those dear sweet, sugary treats.
Personally, as an Italian, I do love me some panettone. And as much as I enjoy the bitter taste of Java coffee, I have been enjoying the sugar that has been introduced in the most recent versions. Indeed, I believe that Java 17 really hits the sweet spot, when it comes to treats. So what better time of the year to indulge in Java’s sweet, sweet sugar than this December?
In the last couple of months I published a blog series with my take on Viktor Klang’s original tiny Java and Scala actor system, updated for Java 17.
Untyped actors in the style of Akka Classic used to be clunky to write in Java, because Java used to lack some key goodies:
- a concise way to express messages; but now we have records
- a tidy syntax to match against the types of the incoming messages; but now we have switch expressions and pattern matching
Another key addition is sealed type hierarchies. If you are able to express the upper bound of your type hierarchy, and such a type hierarchy is “sealed”, then the compiler will tell you if you are missing a case
in a switch
expression (exhaustiveness check).
For instance:
sealed interface A { record X() implements A{} record Y() implements A{} static void f(A a) { switch (a) { case X x -> {} } } }
If you put this in A.java
and run it with java --enable-preview --source 17 A.java
you’ll read:
A.java:6: error: the switch statement does not cover all possible input values switch (a) { ^
In my previous blog posts I have detailed how to develop an actor runtime for untyped actors; that is, actors that can accept any kind of message. In this part we are rewriting that actor runtime from scratch and implement a typed actor runtime, and we will see how sealed type hierarchies can improve the code we write!
This is the full listing of our typed actor runtime. You can also find it at this repository:
package io.github.evacchi; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static java.lang.System.out; public interface TypedActor { interface Effect extends Function<Behavior, Behavior> {} interface Behavior extends Function<T, Effect> {} interface Address { void tell(T msg); } static Effect Become(Behavior next) { return current -> next; } static Effect Stay() { return current -> current; } static Effect Die() { return Become(msg -> { out.println("Dropping msg [" + msg + "] due to severe case of death."); return Stay(); }); } record System(Executor executor) { public Address actorOf(Function<Address, Behavior> initial) { abstract class AtomicRunnableAddress implements Address, Runnable { AtomicInteger on = new AtomicInteger(0); } return new AtomicRunnableAddress() { // Our awesome little mailbox, free of blocking and evil final ConcurrentLinkedQueue mbox = new ConcurrentLinkedQueue<>(); Behavior behavior = initial.apply(this); public void tell(T msg) { mbox.offer(msg); async(); } public void run() { try { if (on.get() == 1) { T m = (T) mbox.poll(); if (m != null) behavior = behavior.apply(m).apply(behavior); } } finally { on.set(0); async(); } } void async() { if (!mbox.isEmpty() && on.compareAndSet(0, 1)) { try { executor.execute(this); } catch (Throwable t) { on.set(0); throw t; } } } }; } } }
But before we get to that, let us learn more about what it does.
The Actor Model
The Actor Model is a concurrency model where the unit of execution is called an actor. An actor receives messages. In response to a message, an actor may (e.g. cf. Wikipedia):
- send a message to another actor
- create new actors
- transition to a new state, with a different behavior, to handle the next message
Behaviors and Effects
The behavior of an actor is just a function that, applied to a message, returns another behavior.
Actors usually encapsulate state; thus, as a side-effect, the behavior function usually updates the state of the actor; it may send other messages to other actors, and creates new actors to handle new state.
For instance, you will have noticed how most web platforms allow you to export the content you have created; but most of them will start a background process and will notify you later when the archive is ready; for instance, by sending a link to your e-mail address.
When the service receives your “export” request, an actor may be responsible for acknowledging your request immediately; but it may spawn another actor to process the request in the background.
At its core, an actor is just a routine paired with a message queue. But instead of evaluating the routine as soon as a message is sent, the system submits a message to the queue of the receiver. Then, at some point, the system “wakes up” that actor: it takes one message from the queue, and it applies the routine to that message.
The routine returns a description of the next state of the actor; i.e. the routine that should be executed when a new message is evaluated.
Such a routine is called a behavior, and in code, the Behavior
can be defined as a function that takes a message of some type, and it returns a transition between states that we call an Effect
:
Behavior : T ⟶ Effect
where T
is some known type.
An Effect
describes a transition between two states of the actor. It can be represented as a function that takes the current Behavior
and returns the next Behavior
:
Effect : Behavior ⟶ Behavior
In code, we may write them as:
interface Behavior<T> extends Function<T, Effect<T>> {} interface Effect<T> extends Function<Behavior<T>, Behavior<T>>{}
The most basic Effect
s (state transitions) are Stay
and Die
:
Stay
means no behavioral changeDie
will effectively turn off the actor, making it inactive.
For instance, this is a valid behavior for an actor that starts, then waits for one message, then it dies: i.e., it will drop and ignore all subsequent messages and/or the system may decide to collect it and throw it away.
Effect<String> receiveThenDie(String msg) { out.println("Got msg: '" + msg + "'; length: " + msg.length()); return TypedActor.Die(); }
Behavior<String> receiveThenDie = msg -> { out.println("Got msg '" + msg + "'; length: " + msg.length()); return TypedActor.Die(); };
Example 1: A Hello World
You can run the following example with:
j! https://github.com/evacchi/min-java-actors/blob/main/src/main/java/io/github/evacchi/typed/examples/HelloWorld.java
In this example we will create an actor system, then spawn an actor that will process one message and then Die
. You will recognize the behavior receiveThenDie
that we defined above.
// create an actor runtime (an actor "system") var actorSystem = new Actor.System(Executors.newCachedThreadPool()); // create an actor Address actor = actorSystem.actorOf(self -> msg -> { out.println("self: " + self +"; got msg: '" + msg + "'; length: " + msg.length()); return Actor.Die(); });
The actorOf
method returns an Address<T>
which is defined as follows:
interface Address<T> { Address<T> tell(T msg); }
allowing us to write:
actor.tell("foo"); actor.tell("bar");
or just:
actor.tell("foo").tell("bar");
which, when executed, prints the following:
self: io.github.evacchi.TypedActor$System$1@24a95c2e; got msg 'foo'; length, 3 Dropping msg [foo] due to severe case of death.
because the "bar"
message was sent to a dead actor.
If we change the lambda to return stay
instead:
Address<String> actor = actorSystem.actorOf(self -> msg -> { out.println("self: " + self +"; got msg: '" + msg + "'; length: " + msg.length()); return Stay(); });
then the output would read:
self: io.github.evacchi.TypedActor$System$1@7519a17c; got msg: 'foo'; length: 3 self: io.github.evacchi.TypedActor$System$1@7519a17c; got msg: 'bar'; length: 3
You may define Stay
as:
static <T> Effect<T> Stay() { return current -> current; }
that is, a transition from the current behavior to the current behavior (i.e. it stays in the same state.)
Die
is defined as:
static <T> Effect<T> Die() { return Become(msg -> { out.println( "Dropping msg [" + msg + "] due to "+ "severe case of death."); return Stay(); }); }
where Become
is:
static Effect<T> Become(Behavior<T> next) { return current -> next; }
i.e. Become
is a method, that, given a Behavior
returns an effect. And that effect is taking the current
behavior and returning the next
one.
Thus, Die
is just an effect that takes the prev
behavior and returns the behavior to drop all messages, and then Stay
s in that state.
With the exception of Become
, which takes a parameter (next
), you may be wondering why Stay
and Die
are not constant fields:
static Effect<T> Stay = return current -> current;
You may be wondering what self
is. self
is a self-reference to the actor. It serves the same purpose as this
in a class. Because the behavior is written as a function, we need to “seed” a reference to this
into the function. But there is no this
until the actor is actually created by the runtime, so we provide it in the closure, so that it may be filled lazily.
If this is not too clear, don’t worry for now; we’ll get to that later.
Use of Types
If you are familiar with the untyped version, you’ll remember that’s how we did it at that time. However, the key here is that little <T>
up there. We have to use a method to let the compiler infer the T
.
Notice how that little T
in the definition of Behavior
makes a huge difference: an untyped actor system would be defined as:
interface Behavior extends Function<Object, Effect> {} interface Effect extends Function<Behavior, Behavior> {} interface Address { Address tell(Object message); }
So the actor itself would be written:
Address actor = actorSystem.actorOf(self -> msg -> { out.println("self: " + self +"; got msg: '" + msg + "'; length: " + msg.length()); return Actor.Die(); };
This would not compile, because msg
has now type Object
and length()
is no longer a known method:
error: cannot find symbol out.println("self: " + self +"; got msg '" + msg + "'; length: " + msg.length()); ^ symbol: method length() location: variable msg of type Object
unless you check its type:
Behavior receiveThenDie = msg -> { if (msg instanceof String) { var s = (String) msg; out.println("self: " + self +"; got msg: '" + s + "'; length: " + s.length()); } else { // handle the non-String message } return Actor.Die(); };
or, more concisely:
if (msg instanceof String s) { out.println("self: " + self +"; got msg: '" + s + "'; length: " + s.length()); ...
The concise version uses Pattern Matching for instanceof
, delivered in JDK 14 (JEP-305). It allows you to check against a type, and get a typed variable out of it if the check passes, all in one line.
Example 2: Ping Pong
You can run the following example with:
j! https://github.com/evacchi/min-java-actors/blob/main/src/main/java/io/github/evacchi/typed/examples/PingPong.java
An actor routine usually accepts more than one type of messages. It is therefore useful to match against all the accepted subtypes.
This is where switch
expressions, records and sealed types are useful.
In a classic actor example, one actor sends a “ping” to another; the second replies with a “pong”, and they go on back and forth.
In order to make this more interesting (and also not to loop indefinitely):
- one of the actors (the
ponger
) will receivePing
and reply withPong
; - it will also count 10
Ping
s, thenDie
; - upon reaching 10 and before it
Die
s, thepinger
will also send a message (DeadlyPong
) to theponger
- the
pinger
receivesPing
and replies withPong
- when it receives a
DeadlyPong
itDie
s.
In the untyped version of this program, the messages do not need to be defined in a hierarchy. But in the typed version, a tiny hierarchy of sealed records will make the code shorter.
There is only one type of Ping
:
record Ping(Address<Pong> sender) {}
the sender of such messages is able to receive Pong
s. Now, we said that there are two types of Pong
s:
record SimplePong(Address sender) {} record DeadlyPong(Address sender) {}
Pong
:interface Pong {} record SimplePong(Address sender) implements Pong {} record DeadlyPong(Address sender) implements Pong {}
Both messages are sent by the actor that is able to receive Ping
s.
void static void main(String... args) { var actorSystem = new TypedActor.System(Executors.newCachedThreadPool()); Address ponger = actorSystem.actorOf(self -> msg -> pongerBehavior(self, msg, 0)); Address pinger = actorSystem.actorOf(self -> msg -> pingerBehavior(self, msg)); ponger.tell(new Ping(pinger)); } static Effect pongerBehavior(Address self, Ping msg, int counter) { if (counter < 10) { out.println("ping! 👉"); msg.sender().tell(new SimplePong(self)); return Become(m -> pongerBehavior(self, m, counter + 1)); } else { out.println("ping! 💀"); msg.sender().tell(new DeadlyPong(self)); return Die(); } } static Effect pingerBehavior(Address self, Pong msg) { return switch (msg) { case SimplePong p -> { out.println("pong! 👈"); p.sender().tell(new Ping(self)); yield Stay(); } case DeadlyPong p -> { out.println("pong! 😵"); p.sender().tell(new Ping(self)); yield Die(); } }; }
This prints the following:
ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 👉 pong! 👈 ping! 💀 pong! 😵 Dropping msg [Ping[sender=io.github.evacchi.TypedActor$System$1@21198648]] due to severe case of death.
Use of Types
If you are familiar with the untyped version, you’ll remember that the pingerBehavior
needed a default
clause: that’s because, as we learned previously, the signature for Behavior
was Function<Object,Effect>
: we had to handle and ignore messages that were not Pong
s!
Because the signature is now effectively Function<Pong, Effect<Pong>>
the compiler knows that only messages from the Pong
hierarchy may be received; thus, we don’t need to add a default
clause!
Likewise pongerBehavior
defined a switch
expression too. In the typed version, however, the switch
is made entirely redundant:
static Effect pongerBehavior(Address self, Ping msg, int counter) { return switch (msg) { case Ping p && counter < 10 -> { out.println("ping! 👉"); p.sender().tell(new SimplePong(self)); yield Become(m -> pongerBehavior(self, m, counter + 1)); } case Ping p -> { out.println("ping! 💀"); p.sender().tell(new DeadlyPong(self)); yield Die(); } }; }
because the signature is Function<Ping, Effect<Ping>>
and we don’t need a default
clause, both case
clauses are matching against a Ping
; thus, the entire switch is effectively equivalent to a simple if
/else
!
Closures vs Classes
Notice how the traditional way to increase a counter is to create a closure with the value:
void static void main(String... args) { ... var ponger = actorSystem.actorOf(self -> msg -> pongerBehavior(self, msg, 0)); ... } static Effect pongerBehavior(Address self, Ping msg, int counter) { return switch (msg) { case Ping p && counter < 10 -> { ... yield Become(m -> pongerBehavior(self, m, counter + 1)); } ... } }
However, a similar effect could be achieved with mutable state; this is perfectly acceptable, because the state of an actor is guaranteed to execute in a thread-safe environment. In this case we could have written:
void run() { ... var ponger = actorSystem.actorOf(StatefulPonger::new); ... } class StatefulPonger implements Behavior { Address self; int counter = 0; StatefulPonger(Address self) { this.self = self; } public Effect apply(Ping msg) { if (counter < 10) { out.println("ping! 👉"); msg.sender().tell(new SimplePong(self)); this.counter++; return Stay(); } else { out.println("ping! 💀"); msg.sender().tell(new DeadlyPong(self)); return Die(); } } }
Example 3: A Vending Machine
You can run the following example with:
j! https://github.com/evacchi/min-java-actors/blob/main/src/main/java/io/github/evacchi/typed/examples/VendingMachine.java
In the previous example, we saw how we to use actors and Become
to maintain mutable state (the counter). In this example we will show how to use Become
to change the behavior of an actor, realizing a *state machines.
A classic example of a state machine is the vending machine.
For instance, we may write a vending machine that requires you to insert an amount of 100 before you can choose an item.
We will define two actors, vendingMachine
and itemPicker
, to simulate that, once the amount of 100 has been reached, and the customer has made their choice, some subroutine will take care of the mechanical arm that selects the item and dispenses it to them.
The messages:
interface VendMessage {} record Coin(int amount) implements VendMessage { public Coin { if (amount < 1 && amount > 100) throw new AssertionError("1 <= amount < 100"); } } record Choice(String product) implements VendMessage {}
we use the record constructor to ensure that the invariant that 1 <= amount < 100
is respected.
There is also the message Vended
that it is only for private communication between the itemPicker
and the vendingMachine
:
record Vended(String product) implements VendMessage {}
it is meant for the itemPicker
to notify when it is done releasing the item, and the vendingMachine
may return to its initial
state.
TypedActor.System sys = new TypedActor.System(Executors.newCachedThreadPool()); Address vendingMachine = sys.actorOf(self -> initial(self)); Address itemPicker = sys.actorOf(self -> msg -> itemPicker(msg));
The behaviors may be defined as follows:
Behavior initial(Address self) { return message -> { if (message instanceof Coin c) { out.printf("Received first coin: %d\n", c.amount()); return Become(waitCoin(self, c.amount())); } else return Stay(); // ignore message, stay in this state }; } Behavior waitCoin(Address self, int accumulator) { out.printf("Budget updated: %d\n", accumulator); return m -> switch (m) { case Coin c && accumulator + c.amount() < 100 -> Become(waitCoin(self, accumulator + c.amount())); case Coin c -> Become(vend(self, accumulator + c.amount())); default -> Stay(); }; } Behavior vend(Address self, int total) { out.printf("Pick an Item! (Budget: %d)\n", total); return message -> switch(message) { case Choice c -> { itemPicker.tell(c); releaseChange(total - 100); yield Stay(); } case Vended v -> Become(initial(self)); default -> Stay(); // ignore message, stay in this state }; }
and the itemPicker
:
Effect itemPicker(Choice message) { vendProduct(message.product()); vendingMachine.tell(new Vended(message.product())); return Stay(); }
vendProduct
and releaseChange
are just printing a message, but we may imagine that they do something costly and complicated:
void vendProduct(String product) { out.printf("VENDING: %s\n", product); } void releaseChange(int change) { out.printf("CHANGE: %s\n", %d); }
now, if we send a series of coins, and then our choice:
vendingMachine.tell(new Coin(50)) .tell(new Coin(40)) .tell(new Coin(30)) .tell(new Choice("Chocolate"));
We will read the following output:
Received first coin: 50 Budget updated: 50 Budget updated: 90 Pick an Item! (Budget: 120) VENDING: Chocolate CHANGE: 20
Use of Types
Notice how we had to add a default
clause in the waitCoin
and vend
states (the initial
state had an else
clause), because, every behavior is of type Behavior<VendMessage>
, which means we need to handle any message in the VendMessage
hierarchy, even when that does not make sense in that state. For instance, a Coin
message does not make sense in the vend
state.
However, the itemPicker
has type Address<Choice>
because that’s the only type of message it will ever be able to receive. This allows use to avoid if
s or switch
es!
Implementing The Actor System
We are now ready to implement the actor system and execution environment. We define the actorOf()
method on a TypedActor.System
class.
public interface TypedActor { class System { Executor executor; public System(Executor executor) { this.executor = executor; } public Address actorOf(Function<Address, Behavior> initial) { // ... references the executor ... } } }
However, in order to keep the number of lines down, we can abuse the record
construct so that we don’t have to write an explicit constructor:
record System(Executor executor) { public Address actorOf(Function<Address, Behavior> initial) { // ... references the executor ... } }
We now need to define an anonymous class implementing both the Address<T>
and the Runnable
interface:
record System(Executor executor) { public Address actorOf(Function<Address, Behavior> initial) { return new Address, Runnable { ... } }
however… that is not valid Java syntax!. What we can do instead, is leveraging another under-used feature of Java: local classes; i.e. a class that is local to the body of a method:
record System(Executor executor) { public Address actorOf(Function<Address, Behavior> initial) { abstract class AtomicRunnableAddress implements Address, Runnable { AtomicInteger on = new AtomicInteger(0); } return new AtomicRunnableAddress<>() { ...
which makes AtomicRunnableAddress
private to that method (which is all we need). We will use the AtomicInteger
to turn on and off the actor; we now create our object:
return new AtomicRunnableAddress() { // the mailbox is just a concurrent queue final ConcurrentLinkedQueue mbox = new ConcurrentLinkedQueue<>(); // current behavior is a mutable field. // the initial behavior applies the `initial` function to `this`, seeding `self` reference to the initial behavior Behavior behavior = initial.apply(this); ... };
Here is the reason why our actors are created with this strange curried function:
var actor = system.actorOf(self -> msg -> ...);
the signature for the initial behavior is really: Function<Address<T>, Behavior<T>>
which “expands” to
Function<Address, Function<T, Effect>>
or, to write it in a possibly more readable format:
Address -> T -> Effect // self -> msg -> ...
The reason why we write it this way is so that the Function<T, Effect<T>
(i.e. the Behavior<T>
) can reference self
. As we saw in Example 2: Ping Pong this is often equivalent to writing a class that takes an Address<T>
in its constructor. And that is because “a closure is a poor man’s object; an object is a poor man’s closure”.
When the actor starts we initialize the Behavior<T>
to a reference to this
:
Behavior behavior = initial.apply(this);
Let us now take a look at the tell()
method; at its core we may write it as:
public Address tell(Object msg) { // put message in the mailbox mb.offer(msg); async(); return this; }
The async method verifies that the mbox contains an element and schedules the actor for execution on the Executor
.
void async() { // if the mbox is non-empty and the actor is active if (!mb.isEmpty() && on.getAndSet(1) == 0)) { // schedule to run on the Executor try { executor.execute(this); } // in case of error deactivate the actor and rethrow the exception catch (Throwable t) { on.set(0); throw t; } } }
In order to be schedulable, the actor must be a Runnable
, so here is the run()
method:
public void run() { try { // if it is active if (on.get() == 1) behavior = behavior.apply(mbox.poll()) // apply the behavior to the top of the mailbox .apply(behavior); // as a result an Effect is returned: // apply it to the current behavior // it returns the next behavior (which overwrites the old in the assignment) } finally { on.set(0); async(); } // deactivate and resume if necessary }
Use of Types
In the original version we initialized the self
address by tell
ing the actor its own Address
. This is doable in this version too, and it’s preferrable, because it allows the initial
behavior to be run asynchronously.
In this version, for simplicity, we are initializing the Behavior<T>
immediately:
Behavior behavior = initial.apply(this);
However, that also means that, if initial
performs a costly operation, it will be executed at creation time; while, in the original version]first-part, it would be evaluated when the first message (the Address
) would be initially received, making it asynchronous.
However, in its simplest implementation, this requires an untyped mailbox (i.e. ConcurrentLinkedQueue<Object>
), which would then require nasty casts. Try developing your own version, limiting the amount of hacks!
Wrapping It Up
I hope you liked this long blog post! Together we implemented a tiny typed actor system, and we saw how to realize a few smaller use cases.
If you have read this far, congratulations! You deserve some panettone too!
If you would like to challenge yourself, try implementing a tiny chat system by following along the untyped version!
Author: Edoardo Vacchi
After my PhD at University of Milan on programming language design and implementation, I worked for three years at UniCredit Bank’s R&D department.
Later, I have joined Red Hat where I worked on the Drools rule engine, the jBPM workflow engine and the Kogito cloud-native business automation platform.
I joined Tetrate to work on the wazero WebAssembly runtime for Go.
Now, at Dylibso I still contribute to and wazero, and I am now also working on Chicory Wasm runtime for the JVM, and other runtimes!
I sometimes write on my own personal blog.