Site icon JVM Advent

Actors and Virtual Threads, a match made in heaven?

Introduction

This blog builds on top of the great articles by Edoardo Vacchi leveraging all the goodies of a Typed Actor API and the working example of a Chat.

Now that Java 19 has been released this use-case is a perfect test bed to use the advanced capabilities of the Loom preview and evaluate what would be the gains and differences when using such a concurrency model for both library authors and final users.

In this blog, we assume some shallow knowledge of the previous posts and we invite you to look up references in case of doubt.

Loom

Let’s start with the excellent definition of this project from the official wiki page:

Project Loom is to intended to explore, incubate and deliver Java VM features and APIs built on top of them for the purpose of supporting easy-to-use, high-throughput lightweight concurrency and new programming models on the Java platform. 

For this article we won’t dig into structured concurrency and we will focus on the usage of Virtual Threads.

This is arguably one of the most notable improvements happening on the JVM nowadays, Virtual Threads are an implementation of Green Threads and a Virtual Thread is not binding to a System Thread, instead, they can be created almost for free and they are extremely lightweight.

Using the “ExecutorService API” to spawn a new Virtual Thread looks as follows:

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

executorService.execute(...);

Loom promise is that the runtime will take care of the blocking code for us, and we should not really “care” anymore about concurrency, but write plain old blocking code. The Java standard library has been changed to account for this new paradigm and all blocking operations (on IO etc.) are going to release the carrier thread automagically.

Loom based Actor System

Let’s start with the typed API surface we defined in the previous blog entry:

public interface TypedLoomActor {
    interface Effect<T> { Behavior<T> transition(Behavior<T> next); }
    interface Behavior<T> { Effect<T> receive(T o); }
    interface Address<T> { Address<T> tell(T msg); }

    static <T> Effect<T> Become(Behavior<T> next) { return current -> next; }
    static <T> Effect<T> Stay() { return current -> current; }
    static <T> Effect<T> Die() { return Become(msg -> { out.println("Dropping msg [" + msg + "] due to severe case of death."); return Stay(); }); }

    record System() {
        private static ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

        public <T> Address<T> actorOf(Function<Address<T>, Behavior<T>> initial) {
            ???
        }
    }
}

and now is the time to fill in the blanks powered by Loom, first of all, let’s define a RunnableAddress which is going to be our execution unit and enables the communication to Actors:

    class RunnableAddress<T> implements Address<T>, Runnable {
        public Address<T> tell(T msg) {
          ???
        }

        public void run() {
          ???
        }
    }

since Loom is offering us the scheduling mechanism we are going to simply bound each actor to exactly one Virtual Thread, and the actorOf implementations look straight-forward:

  public <T> Address<T> actorOf(Function<Address<T>, Behavior<T>> initial) {
      var addr = new RunnableAddress<T>(initial);
      executorService.execute(addr);
      return addr;
  }

as we have seen before, an Actor is usually implemented on top of a Mailbox, and this case is not any different. Since we want to take advantage of the blocking semantics of the JVM, we base our mailbox on a blocking data structure such as a LinkedBlockingQueue, with no additional fear for concurrent access we can implement the tell method of RunnableAddress:

  final LinkedBlockingQueue<T> mailbox = new LinkedBlockingQueue<>();

  public Address<T> tell(T msg) {
      mailbox.offer(msg);
      return this;
  }

the only bit left is the actual Actor’s run method, as opposed to the async version of it, we don’t need anymore the async() duty cycle and we can provide a super dumb implementation leveraging the properties of the LinkedBlockingQueue:

  public void run() {
      Behavior<T> behavior = initial.apply(this);
      while (true) {
          try {
              T message = mailbox.take();
              Effect<T> effect = behavior.receive(message);
              behavior = effect.transition(behavior);
          } catch (InterruptedException e) {
              e.printStackTrace();
              break;
          }
      }
  }

more specifically, the initial behavior is computed right at the start and the rest of the code can be safely executed in a strict loop, knowing that the blocking operation will be handled by the Loom runtime.

The full implementation looks as follows:

public interface TypedLoomActor {
    interface Effect<T> { Behavior<T> transition(Behavior<T> next); }
    interface Behavior<T> { Effect<T> receive(T o); }
    interface Address<T> { Address<T> tell(T msg); }

    static <T> Effect<T> Become(Behavior<T> next) { return current -> next; }
    static <T> Effect<T> Stay() { return current -> current; }
    static <T> Effect<T> Die() { return Become(msg -> { out.println("Dropping msg [" + msg + "] due to severe case of death."); return Stay(); }); }

    record System() {
        private static ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

        public <T> Address<T> actorOf(Function<Address<T>, Behavior<T>> initial) {
            var addr = new RunnableAddress<T>(initial);
            executorService.execute(addr);
            return addr;
        }
    }

    class RunnableAddress<T> implements Address<T>, Runnable {

        final Function<Address<T>, Behavior<T>> initial;
        final LinkedBlockingQueue<T> mailbox = new LinkedBlockingQueue<>();

        RunnableAddress(Function<Address<T>, Behavior<T>> initial) {
          this.initial = initial;
        }

        public Address<T> tell(T msg) {
            mailbox.offer(msg);
            return this;
        }

        public void run() {
            Behavior<T> behavior = initial.apply(this);
            while (true) {
                try {
                    T message = mailbox.take();
                    Effect<T> effect = behavior.receive(message);
                    behavior = effect.transition(behavior);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }
}

I’d argue that is shorter, cleaner and more beautiful than the previous one as we just got rid entirely of:

The Chat example

On top of this shiny new API + Runtime, let’s rebuild the chat example.

The overall architecture will be almost identical, but we will differ in modeling the ChannelActor.java, which is the in-code representation of a wire in between client-server handling all the socket communication. Let’s focus on this important detail.

Using only completely asynchronous operations each ChannelActor is able to trigger the execution of write operations at any time, as it will be notified with a message when something has been read from a socket. On Loom we are supposed to use the old blocking code, where a read is performed with a in.readLine(), which means that our actor is eventually going to “stay stuck” in that condition waiting for more messages to arrive; if we model, as we used to, the Channel with a single actor it will be able to send messages only before/after an actual read from the Socket which seems a pretty bad user experience.

We need to slightly change our mind and enforce a deeper level of separation of concerns, we can do it by actually dividing the two functions contained in the ChannelActor:

The writer behavior can be defined by mechanically connecting the dots:

  Effect<WriteLine> writer(WriteLine wl) {
      out.println(wl.payload());
      return Stay();
  }

where out is a PrintWriter out defined in a context accessible by the function.

The reader the behavior will instead leverage the old java.io blocking operation:

  Effect<PerformReadLine> read(Address<PerformReadLine> self) {
      try {
          return switch (in.readLine()) {
              case null -> { yield Die(); }
              case String line -> {
                  addr.tell(fn.apply(line));
                  self.tell(new PerformReadLine());
                  yield Stay();
              }
          };
      } catch (IOException e) {
          throw new UncheckedIOException(e);
      }
  }

with this encoding:

There are a few minor details on how to keep around references to the java.io objects involved in reading and writing to a socket, here you have a peek at solving the problem with actual plain old classes:

class ChannelActors {
    record WriteLine(String payload) {}

    record PerformReadLine() {}

    final BufferedReader in;
    final PrintWriter out;

    ChannelActors(Socket socket) {
        try {
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    final class Reader<T> {
        final Function<String, T> fn;
        final Address<T> addr;

        public Reader(Address<T> addr, Function<String, T> fn) {
            this.fn = fn;
            this.addr = addr;
        }

        void start(Address<PerformReadLine> readActor) {
            readActor.tell(new PerformReadLine());
        }

        Effect<PerformReadLine> read(Address<PerformReadLine> self) {
            try {
                return switch (in.readLine()) {
                    case null -> { yield Die(); }
                    case String line -> {
                        addr.tell(fn.apply(line));
                        self.tell(new PerformReadLine());
                        yield Stay();
                    }
                };
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    <T> Reader<T> reader(Address<T> addr, Function<String, T> fn) {
        return new Reader<>(addr, fn);
    }

    Effect<WriteLine> writer(WriteLine wl) {
        out.println(wl.payload());
        return Stay();
    }
}

Having this basic building block available we can build, on top of it, a slightly modified version of the ChatServer:

public interface ChatServer {

    sealed interface ClientManagerProtocol { }
    record ClientConnected(Address<ChannelActors.WriteLine> addr) implements ClientManagerProtocol { }
    record LineRead(String payload) implements ClientManagerProtocol {}

    TypedLoomActor.System system = new TypedLoomActor.System();
    int PORT = 4444;

    static void main(String... args) throws IOException, InterruptedException {
        var serverSocket = new ServerSocket(PORT);
        out.printf("Server started at %s.\n", serverSocket.getLocalSocketAddress());

        Address<ClientManagerProtocol> clientManager =
                system.actorOf(self -> msg -> clientManager(msg));

        while (true) {
            var socket = serverSocket.accept();
            var channel = new ChannelActors(socket);
            ChannelActors.Reader<ClientManagerProtocol> reader =
                    channel.reader(clientManager, (line) -> new LineRead(line));
            reader.start(system.actorOf(self -> msg -> reader.read(self)));
            Address<ChannelActors.WriteLine> writer = system.actorOf(self -> msg -> channel.writer(msg));
            clientManager.tell(new ClientConnected(writer));
        }
    }

    static Effect<ClientManagerProtocol> clientManager(ClientManagerProtocol msg) {
        return clientManager(msg, new ArrayList<>());
    }

    static Effect<ClientManagerProtocol> clientManager(ClientManagerProtocol msg, List<Address<ChannelActors.WriteLine>> clients) {
        return switch (msg) {
            case ClientConnected(var address) -> {
                clients.add(address);
                yield Become(m -> clientManager(m, clients));
            }
            case LineRead(var payload) -> {
                clients.forEach(client -> client.tell(new ChannelActors.WriteLine(payload)));
                yield Stay();
            }
        };
    }
}

as before, we are waiting for incoming connections, and, as soon as one arrives it gets registered by the clientManager actor, which will take care of forwarding any incoming message to all the connected clients using ChannelActors.

The ChatClient is the dual implementation which is going to read lines from the standard input and simply forward them to the server through a ChannelActor:

public interface ChatClient {

    String host = "localhost";
    int portNumber = 4444;
    TypedLoomActor.System system = new TypedLoomActor.System();

    sealed interface ClientProtocol { }
    record Message(String user, String text) implements ClientProtocol {}
    record LineRead(String payload) implements ClientProtocol {}

    static void main(String[] args) throws IOException {
        var userName = args[0];

        var socket = new Socket(host, portNumber);
        var channel = new ChannelActors(socket);
        Address<ChannelActors.WriteLine> writer = system.actorOf(self -> msg -> channel.writer(msg));
        Address<ClientProtocol> client = system.actorOf(self -> msg -> client(writer, msg));
        ChannelActors.Reader<ClientProtocol> reader =
                channel.reader(client, (line) -> new LineRead(line));
        reader.start(system.actorOf(self -> msg -> reader.read(self)));

        out.printf("Login............... %s\n", userName);

        var scann = new Scanner(in);
        while (true) {
            switch (scann.nextLine()) {
                case String line when (line != null && !line.isBlank()) ->
                    client.tell(new Message(userName, line));
                default -> {}
            }
        }
    }

    static Effect<ClientProtocol> client(Address<ChannelActors.WriteLine> writer, ClientProtocol msg) {
        var mapper = new ObjectMapper();

        try {
            switch (msg) {
                case Message m -> {
                    var jsonMsg = mapper.writeValueAsString(m);
                    writer.tell(new ChannelActors.WriteLine(jsonMsg));
                }
                case LineRead(var payload) -> {
                    switch (mapper.readValue(payload.trim(), Message.class)) {
                        case Message(var user, var text) -> out.printf("%s > %s\n", user, text);
                    }
                }
            }
            return Stay();
        } catch(JsonProcessingException e) { throw new UncheckedIOException(e); }
    }
}

with jbang installed you can directly run and play with this example code with a few quick commands:

jbang code/ChatServer.java

and in separate terminals:

jbang code/ChatClient.java <name>

Conclusions

In this blog, we presented how Loom can make life much easier for library developers and enable legacy code to be easily converted/ported into high-performing codebases without having to be re-written from scratch.

The experience has been pretty much positive overall and the ergonomics of Loom Virtual Threads nicely fit into the Actor System example, showing evidence of the potential but also practically exploiting the fact that is not going to be a straight “drop-in” replacement for other frameworks and libraries and changes will be required in the process.

Here we have just been scratching the surface of the problem and additional explorations and additional follow up and considerations will be required:

Bye and see you next time!

Author: Andrea Peruffo

With nearly two decades of coding experience, I’m fueled by passion as I continue to type away daily. As a Principal Software Engineer at Red Hat, I actively contribute to diverse Open Source projects, driven by both personal fulfillment and professional advancement. My not-so-secret passion lies in programming languages, developer tools, compilers, and beyond. Come and spot me on a project near you!
Exit mobile version