Composing Multiple Async Results via an Applicative Builder in Java 8

A few months ago, I put out a publication where I explain in detail an abstraction I came up with named Outcome, which helped me A LOT to code without side-effects by enforcing the use of semantics. By following this simple (and yet powerful) convention, I ended up turning any kind of failure (a.k.a. Exception) into an explicit result from a function, making everything much easier to reason about. I don’t know you but I was tired of dealing with exceptions that teared everything down, so I did something about it, and to be honest, it worked really well. So before I keep going with my tales from the trenches, I really recommend going over that post. Now let’s solve some asynchronous issues by using eccentric applicative ideas, shall we?

Something wicked this way comes

Life was real good, our coding was fast-paced,  cleaner and composable as ever, but, out of the blue, we stumble upon a “missing” feature (evil laughs please): we needed to combine several asynchronous Outcome instances in a non-blocking fashion….


Excited by the idea, I got down to work. I experimented for a fair amount of time seeking for a robust and yet simple way of expressing these kind of situations; while the new ComposableFuture API turned out to be much nicer that I expected (though I still don’t understand why they decided to use names like applyAsync  or thenComposeAsync instead of map or flatMap), I always ended up with implementations too verbose and repetitive comparing to some stuff I did with Scala, but after some long “Mate” sessions, I had my “Hey! moment”: Why not using something similar to an applicative?

The problem

Suppose that we have these two asynchronous results

and a silly entity called Message

I need something that given textf and numberf it will give me back something like

//After combining textf and numberf
CompletableFuture<Outcome<Message>> message = ....

So I wrote a letter to Santa Claus:

  1. I want to asynchronously format the string returned by textf using the number returned by numberf only when both values are available, meaning that both futures completed successfully and none of the outcomes did fail. Of course, we need to be non-blocking.
  2. In case of failures, I want to collect all failures that took place during the execution of textf and/or numberf and return them to the caller, again, without blocking at all.
  3. I don’t want to be constrained by the number of values to be combined,  it must be capable of handling a fair amount of asynchronous results. Did I say without blocking? There you go…
  4. Not die during the attempt.


Applicative  builder to the rescue

If you think about it, one simple way to put what we’re trying to achieve is as follows:

// Given a String -> Given a number -> Format the message
f: String -> Integer -> Message

Checking the definition of  f, it is saying something like: “Given a String, I will return a function that takes an Integer as parameter, that when applied, will return an instance of type Message“, this way, instead of waiting for all values to be available at once, we can partially apply one value at a time, getting an actual description of the construction process of a Message instance. That sounded great.

To achieve that, it would be really awesome if we could take the construction lambda Message:new and curry it, boom!, done!, but in Java that’s impossible (to do in a generic, beautiful and concise way), so for the sake of our example, I decided to go with our beloved Builder pattern, which kinda does the job:

And here’s the WannabeApplicative<T> definition

public interface WannabeApplicative<V>
    V apply();

Disclamer: For those functional freaks out there, this is not an applicative per se, I’m aware of that, but I took some ideas from it an adapted them according to the tools that the language offered me out of the box. So, if you’re feeling curious, go check this post for a more formal example.

If you’re still with me, we could agree that we’ve done nothing too complicated so far, but now we need to express a building step, which, remember, needs to be non-blocking and capable to combine any previous failure that might have took place in other executions with potentially new ones. So, in order to do that, I came up with something as follows:

First of all, we’ve got two functional interfaces: one is Partial<B>, which represents a lazy application of a value to a builder, and the second one, MergingStage<B,V>, represents the “how” to combine both the builder and the value. Then, we’ve got a method called value that, given an instance of type CompletableFuture<Outcome<V>>, it will return an instance of type MergingStage<B,V>, and believe or not, here’s where the magic takes place. If you remember the MergingState definition, you’ll see it’s a BiFunction, where the first parameter is of type Outcome<B> and the second one is of type Outcome<V>. Now, if you follow the types, you can tell that we’ve got two things: the partial state of the building process on one side (type parameter B)  and a new value that need to be applied to the current state of the builder (type parameter V), so that, when applied, it will generate a new builder instance with the “next state in the building sequence”, which is represented by Partial<B>. Last but not least, we’ve got the stickedTo method, which basically is a (awful java) hack to stick to a specific applicative type (builder) while defining building step. For instance, having:

I can define partial value applications to any Builder instance as follows:

See that we haven’t built anything yet, we just described what we want to do with each value when the time comes, we might want to perform some validations before using the new value (here’s when Outcome plays an important role) or just use it as it is, it’s really up to us, but the main point is that we haven’t applied anything yet. In order to do so, and to finally tight up all loose ends, I came up with some other definition, which looks as follows:

Hope it’s not that overwhelming, but I’ll try to break it down as clearer as possible. In order to start specifying how you’re going to combine the whole thing together, you will start by calling begin with an instance of type WannabeApplicative<V>, which, in our case, type parameter V is equal to Builder.

FutureCompositions<Message, Builder> ab = begin(Message.applicative())

See that, after you invoke begin, you will get a new instance of FutureCompositions with a lazily evaluated partial state inside of it, making it the one and only owner of the whole building process state, and that was the ultimate goal of everything we’ve done so far, to fully gain control over when and how things will be combined. Next, we must specify the values that we want to combine, and that’s what the binding method is for:


This is how we supply our builder instance with all the values that need to be merged together along with the specification of what’s supposed to happen with each one of them, by using our previously defined Partial instances. Also see that everything’s still lazy evaluated, nothing has happened yet, but still we stacked all “steps” until we finally decide to materialize the result, which will happen when you call perform.

CompletableFuture<Outcome<Message>> message = ab.perform();

From that very moment everything will unfold,  each building stage will get evaluated, where failures could be returned and collected within an Outcome instance or simply the newly available values will be supplied to the target builder instance, one way or the other, all steps will be executed until nothing’s to be done. I will try to depict what just happened as follows


If you pay attention to the left side of the picture, you can easily see how each step gets “defined” as I showed before, following the previous “declaration” arrow direction, meaning, how you actually described the building process. Now, from the moment that you call perform, each applicative instance (remember Builder in our case) will be lazily evaluated in the opposite direction:  it will start by evaluating the last specified stage in the stack, which will then proceed to evaluate the next one and so forth up to the point where we reach the “beginning” of the building definition, where it will start to unfold o roll out evaluation each step up to the top, collecting everything  it can by using the MergingStage specification.

And this is just the beginning….

I’m sure a lot could be done to improve this idea, for example:

  • The two consecutive calls to dependingOn at CompositionSources.values() sucks, too verbose to my taste, I must do something about it.
  • I’m not quite sure to keep passing Outcome instances to a MergingStage, it would look cleaner and easier if we unwrap the values to be merged before invoking it and just return Either<Failure,V> instead – this will reduce complexity and increase flexibility on what’s supposed to happen behind the scenes.
  • Though using the Builder pattern did the job, it feels old-school, I would love to easily curry constructors, so in my to-do list is to check if jOOλ or Javaslang have something to offer on that matter.
  • Better type inference so that the any unnecessary noise gets remove from the code, for example, the stickedTo method, it really is a code smell, something that I hated from the first place. Definitely need more time to figure out an alternative way to infer the applicative type from the definition itself.

You’re more than welcome to send me any suggestions and comments you might have. Cheers and remember…..





Using Java 8 Lambdas, Streams, and Aggregates


In this post, we’ll take a look at filtering and manipulating objects in a Collection using Java 8 lambdas, streams, and aggregates.  All code in this post is available in BitBucket here.

For this example we’ll create a number of objects that represent servers in our IT infrastructure.  We’ll add these objects to a List and then we’ll use lambdas, streams, and aggregates to retrieve servers from the List based on certain criteria.


  1. Introduce the concepts of lambdas, streams, and aggregate operations.
  2. Explain the relationship between streams and pipelines.
  3. Compare and contrast aggregate operations and iterators.
  4. Demonstrate the filter, collect, forEach, mapToLong, average, and getAsDouble aggregate operations.


Lambdas are a new Java language feature that allows us to pass functionality or behavior into methods as parameters.  One example that illustrates the usefulness of Lambdas comes from UI coding. When a user clicks on button on a user interface, it usually causes some action to occur in the application. In this case, we really want to pass a behavior into the onClick(…) method so that the application will execute the given behavior when the button is clicked. In previous versions of Java, we accomplished this by passing an anonymous inner class (that implemented a known interface) into the method. Interfaces used in this kind of scenario usually contain only one method which defines the behavior we wish to pass into the onClick(…) method. Although this works, the syntax is unwieldy. Anonymous inner classes still work for this purpose, but the new Lambda syntax is much cleaner.

Aggregate Operations

When we use Collections to store objects in our programs, we generally need to do more than simply put the objects in the collection — we need to store, retrieve, remove, and update these objects. Aggregate operations use lambdas to perform actions on the objects in a Collection. For example, you can use aggregate operations to:

  • Print the names of all the servers in inventory from a particular manufacturer
  • Return all of the servers in inventory older than a particular age
  • Calculate and return the average age of Servers in your inventory (provided the Server object has a purchase date field)

All of these tasks can be accomplished by using aggregate operations along with pipelines and streams.  We will see examples of these operations below.

Pipelines and Streams

A pipeline is simply a sequence of aggregate operations. A stream is a sequence of items, not a data structure, that carries items from the source through the pipeline. Pipelines are composed of the following:

  1. A data source. Most commonly, this is a Collection, but it could be an array, the return from a method call, or some sort of I/O channel.
  2. Zero or more intermediate operations. For example, a Filter operation. Intermediate operations produce a new stream. A filter operation takes in a stream and then produces another stream that contains only the items matching the criteria of the filter.
  3. A terminal operation. Terminal operations return a non-stream result. This result could be a primitive type (for example, an integer), a Collection, or no result at all (for example, the operation might just print the name of each item in the stream).

Some aggregate operations (i.e. forEach) look like iterators, but they have fundamental differences:

  1. Aggregate operations use internal iteration. Your application has no control over how or when the elements are processed (there is no next() method).
  2. Aggregate operations process items from a stream, not directly from a Collection.
  3. Aggregate operations support Lambda expressions as parameters.

Lambda Syntax

Now that we have discussed the concepts related to Lambda expressions, it is time to look at their syntax. You can think of Lambda expressions as anonymous methods because they have no name. Lambda syntax consists of the following:

  • A comma-separated list of formal parameters enclosed in parentheses. Data types of parameters can be omitted in Lambda expressions. The parentheses can be omitted if there is only one formal parameter.
  • The arrow token: ->
  • A body consisting of a single expression or code block.

Using Lambdas, Streams, and Aggregate Operations

As mentioned in the overview, we’ll demonstrate the use of lambdas, streams, and aggregates by filtering and retrieving Server objects from a List.  We’ll look at four examples:

  1. Finding and printing the names of all the servers from a particular manufacturer.
  2. Finding and printing the names of all of the servers older than a certain number of years.
  3. Finding and extracting into a new List all of the servers older than a certain number of years and then printing the names of the servers in the new list.
  4. Calculating and displaying the average age of the servers in the List.

Let’s get started…

The Server Class

First, we’ll look at our Server class.  The Server class will keep track of the following:

  1. Server name
  2. Server IP address
  3. Manufacturer
  4. Amount of RAM (GB)
  5. Number of processors
  6. Purchase date (LocalDate)

Notice (at line 65) that we’ve added the method getServerAge() that calculates the age of the server (in years) based on the purchase date – we’ll use this method when we calculate the average age of the Servers in our inventory.

Screen Shot 2015-12-09 at 9.57.17 AM


















Creating and Loading the Servers

Now that we have a Server class, we’ll create a List and load several servers:

Screen Shot 2015-12-10 at 10.17.47 AM

Example 1: Print the Names of All the Dell Servers

For our first example, we’ll write some code to find all of the servers made by Dell and then print the server names to the console:

Screen Shot 2015-12-10 at 10.29.48 AM

Our first step is on line 76 – we have to get the stream from our list of servers.  Once we have the stream, we add the filter intermediate operation on line 77.  The filter operation takes a stream of servers as input and then produces another stream of servers containing only the servers that match the criteria specified in the filter’s lambda.  We select only the servers that are made by Dell using the following lambda:
s -> s.getManufacturer().equalsIgnoreCase(manufacturer)

The variable s represents each server that is processed from the stream (remember that we don’t have to declare the type).  The right hand side of the arrow operator represents the statement we want to evaluate for each server processed.  In this case, we’ll return true if the current server’s manufacturer is Dell and false otherwise.  The resulting output stream from the filter contains only those servers made by Dell.

Finally, we add the forEach terminal operation on line 78.  The forEach operation takes a stream of servers as input and then runs the given lambda on each server in the stream.   We print the names of the Dell servers to the console using the following lambda:
server -> System.out.println(server.getName())

Note that we used s as the variable name for each server in the stream in the first lambda and server as the variable name in the second – they don’t have to match from one lambda to the next.

The output of the above code is what we expect:

Screen Shot 2015-12-10 at 11.08.38 AM

Example 2: Print the Names of All the Servers Older Than 3 Years

Our second example is similar to the first except that we want to find the servers that are older than 3 years:

Screen Shot 2015-12-10 at 10.45.33 AM

The only difference between this example and the first is that we changed the lambda expression in our filter operation (line 89) to this:
s -> s.getServerAge() > age

The output stream from this filter contains only servers that are older than 3 years.

The output of the above code is:

Screen Shot 2015-12-10 at 11.25.27 AM

Example 3: Extract All Servers Older Than 3 Years Into a New List

Our third example is similar to the second in that we are looking for the servers that are older than three years.  The difference in this example is that we will create a new List containing only the servers that meet our criteria:

Screen Shot 2015-12-10 at 10.47.36 AM

As in the previous example, we get the stream from the List and add the filter intermediate operation to create a stream containing only those servers older than 3 years (lines 102 and 103).  Now, on line 104, we use the collect terminal operation rather than the forEach terminal operation.  The collect terminal operation takes a stream of servers as input and then puts them in the data structure specified in the parameter.  In our case, we convert the stream into a list of servers.  The resulting list is referenced by the oldServers variable declared on Line 100.

Finally, to demonstrate that we get the same set of servers in this example as the last, we print the names of all the servers in the oldServers list.  Note that, because we want all of the servers in the list, there is no intermediate filter operation.  We simply get the stream from oldServers and feed it to the forEach terminal operation.

The output is what we expect:

Screen Shot 2015-12-10 at 12.13.44 PM

Example 4: Calculate and Print the Average Age of the Servers

In our final example, we’ll calculate the average age of our servers:

Screen Shot 2015-12-10 at 10.50.02 AM

The first step is the same as our previous examples – we get the stream from our list of servers.  Next we add the mapToLong intermediate operation.  This aggregate operation takes a stream of servers as input and produces a stream of Longs as output.  The servers are mapped to Longs according to the specified lambda on Line 119 (you can also use the equivalent syntax on Line 120).  In this case, we are grabbing the age of each incoming server and putting it into the resulting stream of Longs.

Next we add the average terminal operation.  Average does exactly what you would expect – it calculates the average of all of the values in the Stream.  Terminal operations like average that return one value by combining or operating on the contents of a stream are known as reduction operations.  Other examples of reduction operations include summinmax, and count.

Finally, we add the operation getAsDouble.  This is required because average returns the type OptionalDouble.  If the incoming stream is empty, average returns an empty instance of OptionalDouble.  If this happens, calling getAsDouble will throw a NoSuchElementException, otherwise it just returns the Double value in the OptionalDouble instance.

The output of this example is:

Screen Shot 2015-12-10 at 10.35.24 PM


We’ve only scratched the surface as to what you can do with lambdas, streams, and aggregates.  I encourage you to grab the source code, play with it, and start to explore all the possibilities of these new Java 8 features.


Reactive file system monitoring using Akka actors

In this article, we will discuss:

  1. File system monitoring using Java NIO.2
  2. Common pitfalls of the default Java library
  3. Design a simple thread-based file system monitor
  4. Use the above to design a reactive file system monitor using the actor model

Note: Although all the code samples here are in Scala, it can be rewritten in simple Java too. To quickly familiarize yourself with Scala syntax, here is a very short and nice Scala cheatsheet. For a more comprehensive guide to Scala for Java programmers, consult this (not needed to follow this article).

For the absolute shortest cheatsheet, the following Java code:

public void foo(int x, int y) {
  int z = x + y
  if (z == 1) {
  } else {

is equivalent to the following Scala code:

def foo(x: Int, y: Int): Unit = {
  val z: Int = x + y
  z match {
   case 1 => println(x)
   case _ => println(y)

All the code presented here is available under MIT license as part of the better-files library on GitHub.

Let’s say you are tasked to build a cross-platform desktop file-search engine. You quickly realize that after the initial indexing of all the files, you need to also quickly reindex any new files (or directories) that got created or updated. A naive way would be to simply rescan the entire file system every few minutes; but that would be incredibly inefficient since most operating systems expose file system notification APIs that allow the application programmer to register callbacks for changes e.g. ionotify in Linux, FSEvenets in Mac and FindFirstChangeNotification in Windows.

But now you are stuck dealing with OS-specific APIs! Thankfully, beginning Java SE 7, we have a platform independent abstraction for watching file system changes via the WatchService API. The WatchService API was developed as part of Java NIO.2, under JSR-51 and here is a “hello world” example of using it to watch a given Path:

import java.nio.file._
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConversions._

def watch(directory: Path): Unit = {
  // First create the service
  val service: WatchService = directory.getFileSystem.newWatchService()

  // Register the service to the path and also specify which events we want to be notified about
  directory.register(service,  ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)

  while (true) {
    val key: WatchKey = service.take()  // Wait for this key to be signalled
    for {event <- key.pollEvents()} {
      // event.context() is the path to the file that got changed  
      event.kind() match {
        case ENTRY_CREATE => println(s"${event.context()} got created")
        case ENTRY_MODIFY => println(s"${event.context()} got modified")
        case ENTRY_DELETE => println(s"${event.context()} got deleted")        
        case _ => 
          // This can happen when OS discards or loses an event. 
          // See:
          println(s"Unknown event $event happened at ${event.context()}")
    key.reset()  // Do not forget to do this!! See:

Although the above is a good first attempt, it lacks in several aspects:

  1. Bad Design: The above code looks unnatural and you probably had to look it up on StackOverflow to get it right. Can we do better?
  2. Bad Design: The code does not do a very good job of handling errors. What happens when we encounter a file we could not open?
  3. Gotcha: The Java API only allows us to watch the directory for changes to its direct children; it does not recursively watch a directory for you.
  4. Gotcha: The Java API does not allow us to watch a single file, only a directory.
  5. Gotcha: Even if we resolve the aformentioned issues, the Java API does not automatically start watching a new child file or directory created under the root.
  6. Bad Design: The code as implemented above, exposes a blocking/polling, thread-based model. Can we use a better concurrency abstraction?

Let’s start with each of the above concerns.

  • A better interface: Here is what my ideal interface would look like:

abstract class FileMonitor(root: Path) {
  def start(): Unit
  def onCreate(path: Path): Unit
  def onModify(path: Path): Unit
  def onDelete(path: Path): Unit  
  def stop(): Unit

That way, I can simply write the example code as:

val watcher = new FileMonitor(myFile) {
  override def onCreate(path: Path) = println(s"$path got created")  
  override def onModify(path: Path) = println(s"$path got modified")    
  override def onDelete(path: Path) = println(s"$path got deleted")  

Ok, let’s try to adapt the first example using a Java Thread so that we can expose “my ideal interface”:

trait FileMonitor {                               // My ideal interface
  val root: Path                                  // starting file  
  def start(): Unit                               // start the monitor 
  def onCreate(path: Path) = {}                   // on-create callback 
  def onModify(path: Path) = {}                   // on-modify callback 
  def onDelete(path: Path) = {}                   // on-delete callback 
  def onUnknownEvent(event: WatchEvent[_]) = {}   // handle lost/discarded events
  def onException(e: Throwable) = {}              // handle errors e.g. a read error
  def stop(): Unit                                // stop the monitor

And here is a very basic thread-based implementation:

class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor {
  setDaemon(true)        // daemonize this thread
  setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)    

  val service = root.getFileSystem.newWatchService()

  override def run() = Iterator.continually(service.take()).foreach(process)

  override def interrupt() = {

  override def start() = {

  protected[this] def watch(file: Path): Unit = {
    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)

  protected[this] def process(key: WatchKey) = {
    key.pollEvents() foreach {
      case event: WatchEvent[Path] => dispatch(event.kind(), event.context())      
      case event => onUnknownEvent(event)

  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {
    eventType match {
      case ENTRY_CREATE => onCreate(file)
      case ENTRY_MODIFY => onModify(file)
      case ENTRY_DELETE => onDelete(file)

The above looks much cleaner! Now we can watch files to our heart’s content without poring over the details of JavaDocs by simply implementing the onCreate(path), onModify(path), onDelete(path) etc.

  • Exception handling: This is already done above. onException gets called whenever we encounter an exception and the invoker can decide what to do next by implementing it.

  • Recursive watching: The Java API does not allow recursive watching of directories. We need to modify the watch(file) to recursively attach the watcher:

def watch(file: Path, recursive: Boolean = true): Unit = {
  if (Files.isDirectory(file)) {
    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)              
     // recursively call watch on children of this file  
     if (recursive) { 
       Files.list(file).iterator() foreach {f => watch(f, recursive)}
  • Watching regular files: As mentioned before, the Java API can only watch directories. One hack we can do to watch single files is to set a watcher on its parent directory and only react if the event trigerred on the file itself.

override def start() = {
  if (Files.isDirectory(root)) {
    watch(root, recursive = true) 
  } else {
    watch(root.getParent, recursive = false)

And, now in process(key), we make sure we react to either a directory or that file only:

def reactTo(target: Path) = Files.isDirectory(root) || (root == target)

And, we check before dispatch now:

case event: WatchEvent[Path] =>
  val target = event.context()
  if (reactTo(target)) {
    dispatch(event.kind(), target)
  • Auto-watching new items: The Java API, does not auto-watch any new sub-files. We can address this by attaching the watcher ourselves in process(key) when an ENTRY_CREATE event is fired:

if (reactTo(target)) {
  if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {
  dispatch(event.kind(), target)

Putting it all together, we have our final FileMonitor.scala:

class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor {
  setDaemon(true) // daemonize this thread
  setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)    

  val service = root.getFileSystem.newWatchService()

  override def run() = Iterator.continually(service.take()).foreach(process)

  override def interrupt() = {

  override def start() = {
    if (Files.isDirectory(root)) {
      watch(root, recursive = true) 
    } else {
      watch(root.getParent, recursive = false)

  protected[this] def watch(file: Path, recursive: Boolean = true): Unit = {
    if (Files.isDirectory(file)) {
      file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
      if (recursive) {
        Files.list(file).iterator() foreach {f => watch(f, recursive)}

  private[this] def reactTo(target: Path) = Files.isDirectory(root) || (root == target)

  protected[this] def process(key: WatchKey) = {
    key.pollEvents() foreach {
      case event: WatchEvent[Path] =>
        val target = event.context()
        if (reactTo(target)) {
          if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {
          dispatch(event.kind(), target)
      case event => onUnknownEvent(event)

  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {
    eventType match {
      case ENTRY_CREATE => onCreate(file)
      case ENTRY_MODIFY => onModify(file)
      case ENTRY_DELETE => onDelete(file)

Now, that we have addressed all the gotchas and distanced ourselves from the intricacies of the WatchService API, we are still tightly coupled to the thread-based API.
We will use the above class to expose a different concurrency model, namely, the actor model instead to design a reactive, dynamic and resilient file-system watcher using Akka. Although the construction of Akka actors is beyond the scope of this article, we will present a very simple actor that uses the ThreadFileMonitor:

import java.nio.file.{Path, WatchEvent}


class FileWatcher(file: Path) extends ThreadFileMonitor(file) with Actor {
  import FileWatcher._

  // MultiMap from Events to registered callbacks
  protected[this] val callbacks = newMultiMap[Event, Callback]  

  // Override the dispatcher from ThreadFileMonitor to inform the actor of a new event
  override def dispatch(event: Event, file: Path) = self ! Message.NewEvent(event, file)  

  // Override the onException from the ThreadFileMonitor
  override def onException(exception: Throwable) = self ! Status.Failure(exception)

  // when actor starts, start the ThreadFileMonitor
  override def preStart() = super.start()   
  // before actor stops, stop the ThreadFileMonitor
  override def postStop() = super.interrupt()

  override def receive = {
    case Message.NewEvent(event, target) if callbacks contains event => 
       callbacks(event) foreach {f => f(event -> target)}

    case Message.RegisterCallback(events, callback) => 
       events foreach {event => callbacks.addBinding(event, callback)}

    case Message.RemoveCallback(event, callback) => 
       callbacks.removeBinding(event, callback)

object FileWatcher {
  type Event = WatchEvent.Kind[Path]
  type Callback = PartialFunction[(Event, Path), Unit]

  sealed trait Message
  object Message {
    case class NewEvent(event: Event, file: Path) extends Message
    case class RegisterCallback(events: Seq[Event], callback: Callback) extends Message
    case class RemoveCallback(event: Event, callback: Callback) extends Message

This allows us to dynamically register and remove callbacks to react to file system events:

// initialize the actor instance
val system = ActorSystem("mySystem") 
val watcher: ActorRef = system.actorOf(Props(new FileWatcher(Paths.get("/home/pathikrit"))))

// util to create a RegisterCallback message for the actor
def when(events: Event*)(callback: Callback): Message = {
  Message.RegisterCallback(events.distinct, callback)

// send the register callback message for create/modify events
watcher ! when(events = ENTRY_CREATE, ENTRY_MODIFY) {   
  case (ENTRY_CREATE, file) => println(s"$file got created")
  case (ENTRY_MODIFY, file) => println(s"$file got modified")

Full source: FileWatcher.scala

Migrating Spring App to MicroServices App on AWS

Migrating Spring App to MicroServices App on AWS


The company I am working for has recently gone through a migration of refactoring our code base from a monolithic application (Java Spring WAR) into a MicroServices Application hosted on the Amazon PAAS (specifically Beanstalk and CloudFront). As part of this blog post I have provided a small and simple Sales Demo application and will discuss the steps of what is required for refactoring the application so that it can be run within Beanstalk/S3/CloudFront environments.

For the purposes of this blog, I will be using a SalesTax demo application and the code can be found here ( This site will provide users a list of products and give them the ability to create an order and apply sales tax. I have created a more detailed guide, which includes steps for creating the different services in AWS. The guide can be found at this location ( The following is a diagram of the Spring Architecture:



The above architecture is a pretty standard Spring architecture for most monolithic web applications. In our migration, we broke up our code and separated the backend services from the front end content JSPs(Now HTML), CSS and JS. The following is a diagram illustrating our model of how we controlled access:


Amazon Web Services

I am going to start by explaining at a high-level what these different components in AWS are and how we integrate them together.


Route 53

Route 53 is a Domain Name Service( which allows you to route traffic to different internal AWS services. In our model we used Route 53 to host our DNS servers (for example



Amazon S3 ( is a simple storage service which allows you to store content (html, css, js files in buckets in the cloud). In this demo we will be using Amazon S3 to host the static content (html, css, and JS).



Beanstalk ( an application stack which will be used to host our individual services. Beanstalk has access to multiple stacks (Tomcat, PHP, Node, Ruby, Go, .Net). In this demo we will be using Beanstalk to host our different web services (as Spring WARS running on Tomcat).



Amazon Relational Database Service (RDS will be used to host our database. We will create an RDS database and our web services will be used to connect to the database.



Amazon CloudFront is the glue that will tie all your different services together under one common URL. We will define an origin (which will correspond to our URL, defined in Route 53 When the user hits this URL Route53 will route the traffic to CloudFront. CloudFront will host the content and push it to edge locations around the world. In CloudFront you are able to redirect traffic based on URL patterns. For example anyone coming to the default pattern (/*) can be redirected to a bucket in S3 which hosts your static content (i.e. html, css, images). If they come to say an API URL (/api/products) you can route them to a Beanstalk service in the backend.

Infrastructure Security

In our production systems we have all our web services hidden behind different VPCs and have implemented network rules to restrict access to our backend services. I do not think I will have time to address this in this blog, but will try to talk about this in my next.


Application Security

One major component I have not included in the Sales Demo is Spring Security. In our application, we removed our Spring Security and replaced access control using an API Gateway. I will discuss this concept briefly at the end of this blog.


NOTE: AWS is a very sophisticated and complex ecosystem that provides multiple ways to integrate these different services. The model I will be discussing is similar to the model which we implemented at our company.


SalesTax Application Overview


The SalesTax Demo application will look like a traditional Spring Application with one exception. The JSP pages do not follow the traditional Spring MVC model with data being passed from the controller and then the JSP pages rendering the view. Instead we are using Angular, which makes REST calls to the backend controllers and renders of the content in the browser. The reason that we are doing this is so that we can migrate our static content (html, css, js files) to S3 buckets and have our backend services run in beanstalk.


I have created a guide, which provides step-by-step instructions with pictures on how to setup your environment in AWS. You can find a link to the document on github at this location. The rest of the document will provide a summary of the process with references to the guide. If you would like to try this on your own AWS setup I recommend you look at the detailed guide here ( ).


Migration Process


The following section will provide a high-level overview of the migration process. Again if you would like to try this out for yourself, I would recommend using the detailed guide.


Deploy Application to Beanstalk


The first step will be to build the application and deploy it into a beanstalk instance. To checkout the code please run the following command:

Git clone step0


You can import the project into your IDE (Eclipse, NetBeans, STS, etc) or you can just build this from the command line. To build the project run the following commands:


mvn clean install


Once the WAR has been built, log into the AWS Adminstration console and deploy your WAR in a new Beanstalk Instance. For detailed instructions see the install guide


Configure CloudFront to point to yourBeanstalk Instance


Login into the Amazon Console and click on the CloudFront link. At this point you have two options:

-Use your own domain name(

-Use the default provided by Cloud Front(this will look something like

If you already have your own domain name you can add it to Route 53. The following link provides detailed instructions on how to do this ( If you do not have your own you can just create a CloudFront Origin and it will give you a url.


The goal of this step is to use CloudFront to map your url (either your own or generated to your hosted application in BeanStalk. In CloudFront you will define a Web Distribution and then for that distribution you will define an Origin.   Origins in Cloud Front represent backend services (i.e. S3 buckets to host static content or Beanstalk Applications which host your Spring Apps). Finally, you will create a Behavior that will instruct CloudFront to map all requests of a certain url pattern to a specific Beanstalk Instance. For first step we will map all requests (/) to the Beanstalk instance. In future steps will map all requests of the format (/api/) to your Beanstalk instance and the rest (/*) will go to your S3 Bucket. Below is an image of what the screen for creating a Behavior would look like.


Create RDS Postgres instance and connect to Beanstalk


In this step we create a publicly accessible RDS instance and then connect to it from our pgAdmin tool to create the database. The sql script and updated code can be found by pulling down the step1 branch as follows:


Git clone step1


The sql create script can be found in the following location

src/resources/sql/ createSalesTax-DB-Postgres.sql


Once your database is created you can rebuild your project with maven using the following command:

mvn clean install


Log back into your Amazon console and redeploy your latest war file. You will also need to append environment properties to your Beanstalk instance so it knows where to find your database. This can be done by clicking on Configuration, Software Configuration, and adding them to Environment Properties


If you reload your application you will see that it is now pulling the products from the database instance in AWS.


Create an S3 Bucket and deploy Static Content to it


In this step we are going to create an S3 bucket and will move our Static Content (html, css, images, etc) to it. To get the latest code for this we will need to pull down the latest changes from the git. Run the following command



Git clone step2


Log back into the Amazon Console and click on S3. Click on Create Bucket and create a new bucket.



Once your bucket is created, click on Properties (upper right corner) and click on Static Website Hosting to enable hosting of content. Once your S3 bucket is ready you can transfer the static content of the project to S3. The code to transfer is in the following directory:


Update Cloud Front to reflect new origins

We will need to update CloudFront to redirect the requests to their appropriate origins. The first step will be to log into CloudFront and create an Origin for your newly created bucket. Once your Origin has been created you will need to modify the Behavior so that your default Behavior () now points to your static content in S3 and your API requests (/api/) are redirected to your Elastic Beanstalk instance.  The following is a diagram of the proposed changes to CloudFront.


Redeploy Application

Once CloudFront has been updated and the status has changed to deployed, your static content, which is hosted in S3, will now be accessible by your CloudFront url. The only thing left to do is rebuild the sales demo application and redeploy it into Beanstalk. At this stage, all the front end code (html, js, css) has been moved to the web directory and the backend functionality is in the services directory. To rebuild your application run the maven command in services directory


mvn clean install


Log back into the Amazon Console and redeploy your Beanstalk application with the new WAR.

The above architecture is a good starting point for anyone who is looking at migrating their Spring application to a cloud based MicroServices. As part of your migration I would suggest looking at incorporating an API Gateway. There are a series of open source and commercially available API Gateways (Amazon released their API Gateway in July 2015,, etc). The API Gateway will sit in between CloudFront and your backend services and will handle authentication and access control, and it will redirect your requests to the appropriate Beanstalk instance.   I have included a picture of the API Gateway below.





Grails Application with MySQL container

With the container ecosystem evolving of the past years, developers start to become aware of a solution that can enhance their productivity and deployment time. I will be keeping the instructions as simple and detailed as possible. For people completely new to Docker, I recommend this video:

For more in depth understanding and commands available, you can take a look a the official documentation. I also recommend this short read to understand the differences between containers and VMs.

Let’s say you’re developing feature X that requires data  from a relational database to be kept up to date for you to perform your development and manual testing. This might mean that you probably need some kind of mechanism to refresh the data, e.g. some kind of synch job where you can get your most up-to-date data from.

Depending on the size of your data, it can range from a few seconds to several minutes. What if you add new columns or you want to change column types?

For this tutorial you will need:

Java 8

Grails 3 (you can easily install it via

Docker (follow the steps at for your OS)

MySQL client

Tutorial Project: DockerTutorial

I am performing this tutorial on a Macbook with OS X Yosemite. When the steps differ from Linux installations, I will mention steps for both.

Start  Docker Quickstart Terminal and make sure docker works properly:

docker -version

You should see something along the lines of

Docker version

An IDE is not necessary but if you want to use one, you can just import it as a Gradle project.

Now, we need to download the MySQL docker image for MacOS. Due to a current bug in the official image, we have to use a different image. The issue is only for MacOS and can be followed here.

docker pull dgraziotin/mysql – For MacOS

docker pull mysql:5.6 – For Linux

Before we can do anything with our image, make sure that (if we have any instance of mysql server running locally) we shutdown our running mysql server.

Now, we can start our mysql image.


The above commands starts our container with name docker-db in the terminal window,  where we can monitor the container logs. This creates an admin user with a random generated password, outputted in the logs. You should see a message similar to the following:

db admin user

To verify that our container is working properly, let’s try connecting to it via our mysql client:

mysql login

Where host should be for Linux.

For MacOS, it is slightly different.  Docker in MacOS makes use of a virtual machine. When you start docker with the Quickstart Terminal, this is actually creating a virtual machine with the Docker daemon running inside it.  We need to run the following command to get our desired IP by running docker-machine ls. This results in an output similar to the following:

Let’s create our database:

 create db

Let’s keep the mysql window open as we’ll come back to it later on.

For our Grails application, we will need to change the database driver for it to connect properly to our MySQL container. Navigate to the grails-app/conf folder and open application.yml. Change host, port and password to the values you used above.

app config

Ok, now that our app is configured properly and our container is running, let’s take it for a spin. Navigate to the root folder where you extracted the project files and run grails run-app.

When the application starts, let’s open it via http://localhost:8080 and we will presented with the Grails scaffolding with a couple of controllers.

When refreshing your local environment, you need to have a mechanism that will populate your database with the new data. This can be a set of scripts and/or database dumps. Insert and update statements can take quite a bit of time depending on the database size and the amount of data required. One alternative, could be to simply get the dump we need and then sync that dump with a database container.

One could argue: “What would be the different from simply using a local MySQL instance?”

What if you’d need to keep your current database state? Let’s say you found a bug or some other kind of issue that can be replicated with that specific set of data. With Docker, you can just spin another container and mount another folder while keeping the one you need.

Or if you have non-developers in team, e.g. designers. Wouldn’t it make their life easier if they just add to run a simple script that starts the container, so they can do their work on an up-to-date database?

Another bonus point is that you do not need to have anything at all related to MySQL server installed on your machine.

How do we achieve this behaviour? We can achieve it through a data volume. The approach that I am going to show will involve mounting a volume in our machine with one inside the container.  First, let’s create our data folder, e.g. /tmp/docker-data

Next, let’s add our data files into that folder. I’ve populated a small sample database for the purpose of the tutorial:

Now, we will execute the container again, but this time with a data volume.

docker run -t -i -p 3306:3306 -v /tmp/docker-db-data:/mysql –name docker-db dgraziotin/mysql

The v flag specifies the mapping from our host to the container. When running that container with those flags, we are saying I want to run a MySQL container named docker-db that will store the database data in my /tmp/docker-db-data folder.

Let’s jump back to our application folder and execute grails run-app. If you navigate to one of the two scaffolding pages, you will see some Accounts and Transactions.

This concludes the introduction to this series where we demonstrated how to incorporate a sample application with a container containing a RDS database.

Reactive Development Using Vert.x

Lately, it seems like we’re hearing about the latest and greatest frameworks for Java. Tools like Ninja, SparkJava, and Play; but each one is opinionated and make you feel like you need to redesign your entire application to make use of their wonderful features. That’s why I was so relieved when I discovered Vert.x. Vert.x isn’t a framework, it’s a toolkit and it’s un-opinionated and it’s liberating. Vert.x doesn’t want you to redesign your entire application to make use of it, it just wants to make your life easier. Can you write your entire application in Vert.x? Sure! Can you add Vert.x capabilities to your existing Spring/Guice/CDI applications? Yep! Can you use Vert.x inside of your existing JavaEE applications? Absolutely! And that’s what makes it amazing.


Vert.x was born when Tim Fox decided that he liked a lot of what was being developed in the NodeJS ecosystem, but he didn’t like some of the trade-offs of working in V8: Single-threadedness, limited library support, and JavaScript itself. Tim set out to write a toolkit which was unopinionated about how and where it is used, and he decided that the best place to implement it was on the JVM. So, Tim and the community set out to create an event-driven, non-blocking, reactive toolkit which in many ways mirrored what could be done in NodeJS, but also took advantage of the power available inside of the JVM. Node.x was born and it later progressed to become Vert.x.


Vert.x is designed to implement an event bus which is how different parts of the application can communicate in a non-blocking/thread safe manner. Parts of it were modeled after the Actor methodology exhibited by Eralng and Akka. It is also designed to take full advantage of today’s multi-core processors and highly concurrent programming demands. As such, by default, all Vert.x VERTICLES are implemented as single-threaded by default. Unlike NodeJS though, Vert.x can run MANY verticles in MANY threads. Additionally, you can specify that some verticles are “worker” verticles and CAN be multi-threaded. And to really add some icing on the cake, Vert.x has low level support for multi-node clustering of the event bus via the use of Hazelcast. It has gone on to include many other amazing features which are too numerous to list here, but you can read more in the official Vert.x docs.

The first thing you need to know about Vert.x is, similar to NodeJS, never block the current thread. Everything in Vert.x is set up, by default, to use callbacks/futures/promises. Instead of doing synchronous operations, Vert.x provides async methods for doing most I/O and processor intensive operations which might block the current thread. Now, callbacks can be ugly and painful to work with, so Vert.x optionally provides an API based on RxJava which implements the same functionality using the Observer pattern. Finally, Vert.x makes it easy to use your existing classes and methods by providing the executeBlocking(Function f) method on many of it’s asynchronous APIs. This means you can choose how you prefer to work with Vert.x instead of the toolkit dictating to you how it must be used.

The second thing to know about Vert.x is that it composed of verticles, modules, and nodes. Verticles are the smallest unit of logic in Vert.x, and are usually represented by a single class. Verticles should be simple and single-purpose following the UNIX Philosophy. A group of verticles can be put together into a module, which is usually packaged as a single JAR file. A module represents a group of related functionality which when taken together could represent an entire application or just a portion of a larger distributed application. Lastly, nodes are single instances of the JVM which are running one or more modules/verticles. Because Vert.x has clustering built-in from the ground up, Vert.x applications can span nodes either on a single machine or across multiple machines in multiple geographic locations (though latency can hider performance).

Example Project

Now, I’ve been to a number of Meetups and conferences lately where the first thing they show you when talking about reactive programming is to build a chat room application. That’s all well and good, but it doesn’t really help you to completely understand the power of reactive development. Chat room apps are simple and simplistic. We can do better. In this tutorial, we’re going to take a legacy Spring application and convert it to take advantage of Vert.x. This has multiple purposes: It shows that the toolkit is easy to integrate with existing Java projects, it allows us to take advantage of existing tools which may be entrenched parts of our ecosystem, and it also lets us follow the DRY principle in that we don’t have to rewrite large swathes of code to get the benefits of Vert.x.

Our legacy Spring application is a contrived simple example of a REST API using Spring Boot, Spring Data JPA, and Spring REST. The source code can be found in the “master” branch HERE. There are other branches which we will use to demonstrate the progression as we go, so it should be simple for anyone with a little experience with git and Java 8 to follow along. Let’s start by examining the Spring Configuration class for the stock Spring application.

public class Application {
    public static void main(String[] args) {
        ApplicationContext ctx =, args);

        System.out.println("Let's inspect the beans provided by Spring Boot:");

        String[] beanNames = ctx.getBeanDefinitionNames();
        for (String beanName : beanNames) {

    public DataSource dataSource() {
        EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
        return builder.setType(EmbeddedDatabaseType.HSQL).build();

    public EntityManagerFactory entityManagerFactory() {
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();

        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();

        return factory.getObject();

    public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
        final JpaTransactionManager txManager = new JpaTransactionManager();
        return txManager;

As you can see at the top of the class, we have some pretty standard Spring Boot annotations. You’ll also see an @Slf4j annotation which is part of the lombok library, which is designed to help reduce boiler-plate code. We also have @Bean annotated methods for providing access to the JPA EntityManager, the TransactionManager, and DataSource. Each of these items provide injectable objects for the other classes to use. The remaining classes in the project are similarly simplistic. There is a Customer POJO which is the Entity type used in the service. There is a CustomerDAO which is created via Spring Data. Finally, there is a CustomerEndpoints class which is the JAX-RS annotated REST controller.

As explained earlier, this is all standard fare in a Spring Boot application. The problem with this application is that for the most part, it has limited scalability. You would either run this application inside of a Servlet container, or with an embedded server like Jetty or Undertow. Either way, each requests ties up a thread and is thus wasting resources when it waits for I/O operations.

Switching over to the Convert-To-Vert.x-Web branch, we can see that the Application class has changed a little. We now have some new @Bean annotated methods for injecting the Vertx instance itself, as well as an instance of ObjectMapper (part of the Jackson JSON library). We have also replaced the CustomerEnpoints class with a new CustomerVerticle. Pretty much everything else is the same.

The CustomerVerticle class is annotated with @Component, which means that Spring will instantiate that class on startup. It also has it’s start method annotated with @PostConstruct so that the Verticle is launched on startup. Looking at the actual content of the code, we see our first bits of Vert.x code: Router.

The Router class is part of the vertx-web library and allows us to use a fluent API to define HTTP URLs, methods, and header filters for our request handling. Adding the BodyHandler instance to the default route allows a POST/PUT body to be processed and converted to a JSON object which Vert.x can then process as part of the RoutingContext. The order of routes in Vert.x CAN be significant. If you define a route which has some sort of glob matching (* or regex), it can swallow requests for routes defined after it unless you implement chaining. Our example shows 3 routes initially.

    public void start() throws Exception {
        Router router = Router.router(vertx);

Notice that the HTTP method is defined, the “Accept” header is defined (via consumes), and the “Content-Type” header is defined (via produces). We also see that we are passing the handling of the request off via a call to the blockingHandler method. A blocking handler for a Vert.x route accepts a RoutingContext object as it’s only parameter. The RoutingContext holds the Vert.x Request object, Response object, and any parameters/POST body data (like “:id”). You’ll also see that I used method references rather than lambdas to insert the logic into the blockingHandler (I find it more readable). Each handler for the 3 request routes is defined in a separate method further down in the class. These methods basically just call the methods on the DAO, serialize or deserialize as needed, set some response headers, and end() the request by sending a response. Overall, pretty simple and straightforward.

    private void addCustomer(RoutingContext rc) {
        try {
            String body = rc.getBodyAsString();
            Customer customer = mapper.readValue(body, Customer.class);
            Customer saved =;
            if (saved!=null) {
            } else {
                rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");
        } catch (IOException e) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", e);

    private void getCustomerById(RoutingContext rc) {"Request for single customer");
        Long id = Long.parseLong(rc.request().getParam("id"));
        try {
            Customer customer = dao.findOne(id);
            if (customer==null) {
                rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");
            } else {
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);

    private void getAllCustomers(RoutingContext rc) {"Request for all customers");
        List customers =, false).collect(Collectors.toList());
        try {
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);

“But this is more code and messier than my Spring annotations and classes”, you might say. That CAN be true, but it really depends on how you implement the code. This is meant to be an introductory example, so I left the code very simple and easy to follow. I COULD use an annotation library for Vert.x to implement the endpoints in a manner similar to JAX-RS. In addition, we have gained a massive scalability improvement. Under the hood, Vert.x Web uses Netty for low-level asynchronous I/O operations, thus providing us the ability to handle MANY more concurrent requests (limited by the size of the database connection pool).

We’ve already made some improvement to the scalability and concurrency of this application by using the Vert.x Web library, but we can improve things a little more by implementing the Vert.x EventBus. By separating the database operations into Worker Verticles instead of using blockingHandler, we can handle request processing more efficiently. This is show in the Convert-To-Worker-Verticles branch. The application class has remained the same, but we have changed the CustomerEndpoints class and added a new class called CustomerWorker. In addition, we added a new library called Spring Vert.x Extension which provides Spring Dependency Injections support to Vert.x Verticles. Start off by looking at the new CustomerEndpoints class.

    public void start() throws Exception {"Successfully create CustomerVerticle");
        DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);
        vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {
            if (res.succeeded()) {
                Router router = Router.router(vertx);
                final DeliveryOptions opts = new DeliveryOptions()
                        .handler(rc -> {
                            opts.addHeader("method", "getCustomer")
                                    .addHeader("id", rc.request().getParam("id"));
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        .handler(rc -> {
                            opts.addHeader("method", "addCustomer");
                            vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));
                        .handler(rc -> {
                            opts.addHeader("method", "getAllCustomers");
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
            } else {
                log.error("Failed to deploy worker verticles.", res.cause());

The routes are the same, but the implementation code is not. Instead of using calls to blockingHandler, we have now implemented proper async handlers which send out events on the event bus. None of the database processing is happening in this Verticle anymore. We have moved the database processing to a Worker Verticle which has multiple instances to handle multiple requests in parallel in a thread-safe manner. We are also registering a callback for when those events are replied to so that we can send the appropriate response to the client making the request. Now, in the CustomerWorker Verticle we have implemented the database logic and error handling.

public void start() throws Exception {

public void handleDatabaseRequest(Message<Object> msg) {
    String method = msg.headers().get("method");

    DeliveryOptions opts = new DeliveryOptions();
    try {
        String retVal;
        switch (method) {
            case "getAllCustomers":
                retVal = mapper.writeValueAsString(dao.findAll());
                msg.reply(retVal, opts);
            case "getCustomer":
                Long id = Long.parseLong(msg.headers().get("id"));
                retVal = mapper.writeValueAsString(dao.findOne(id));
            case "addCustomer":
                retVal = mapper.writeValueAsString(
                                                    ((JsonObject)msg.body()).encode(), Customer.class)));
                log.error("Invalid method '" + method + "'");
                opts.addHeader("error", "Invalid method '" + method + "'");
      , "Invalid method");
    } catch (IOException | NullPointerException e) {
        log.error("Problem parsing JSON data.", e);, e.getLocalizedMessage());

The CustomerWorker worker verticles register a consumer for messages on the event bus. The string which represents the address on the event bus is arbitrary, but it is recommended to use a reverse-tld style naming structure so that it is simple to ensure that the addresses are unique (“com.zanclus.customer”). Whenever a new message is sent to that address, it will be delivered to one, and only one, of the worker verticles. The worker verticle then calls handleDatabaseRequest to do the database work, JSON serialization, and error handling.

There you have it. You’ve seen that Vert.x can be integrated into your legacy applications to improve concurrency and efficiency without having to rewrite the entire application. We could have done something similar with an existing Google Guice or JavaEE CDI application. All of the business logic could remain relatively untouched while we tried in Vert.x to add reactive capabilities. The next steps are up to you. Some ideas for where to go next include Clustering, WebSockets, and VertxRx for ReactiveX sugar.

Writing BDD tests with Cucumber JVM

Cucumber JVM as an excellent tool to write your BDD tests.In this article I would like to give an introduction to BDD with Cucumber JVM.

Let’s get started…

What is BDD?


In a nutshell, BDD tries to solve the problem of “understanding requirements with examples”

BDD tools
There are lot of tools available for BDD and interestingly you can find quite a few vegetable names in the list 😉 Cucumber,Spinach, Lettuce, JBehave, Twist etc. Out of these Cucumber is simple and easy to use.

Cucumber is written in Ruby and Cucumber JVM is an implementation of cucumber for the popular JVM languages like Java, Scala, Groovy, Clojure etc

Cucumber Stack
We write features and scenarios in a “Ubiquitous” Language and then implement them with the step definitions and support code.

Feature file and Gherkin
You first begin by writing a .feature file.A feature file conventionally starts with the Feature keyword followed by Scenario. Each scenario consists of multiple steps. Cucumber uses Gherkin for this. Gherkin is a Business Readable, Domain Specific Language that lets you describe software’s behavior without detailing how that behavior is implemented.

Feature: Placing bets       
 Scenario: Place a bet with cash balance         
 Given I have an account with cash balance of 100        
 When I place a bet of 10 on "SB_PRE_MATCH"      
 Then the bet should be placed successfully      
 And the remaining balance in my account should be 90

As you can see the feature file is more like a spoken language with gherkin keywords like Feature, Scenario, Given,When, Then,And ,But, #(for comments).

Step Definitions
Once you have finalized the feature file with different scenarios, the next stage is to give life to the scenarios by writing your step definitions. Cucumber uses regular expression to map the steps with the actual step definitions. Step definitions can be written in the JVM language of your choice. The keywords are ignored while mapping the step definitions.
So in reference to the above example feature we will have to write step definition for all the four steps. Use the IDE plugin to generate the stub for you.

public class PlaceBetStepDefs {      
 @Given("^I have an account with cash balance of (\\d+) $")      
 public void accountWithBalance(int balance) throws Throwable {      
 // Write code here that turns the phrase above into concrete actions        
 //throw new PendingException();         
 @When("^I place a bet of (\\d+) on \"(.*?)\"$")         
 public void placeBet(int stake, String product) throws Throwable {      
 // Write code here that turns the phrase above into concrete actions        
 // throw new PendingException();        
 @Then("^the bet should be placed successfully$")        
 public void theBetShouldBePlacedSuccessfully() throws Throwable {       
 // Write code here that turns the phrase above into concrete actions        
 //throw new PendingException();         
 @And("^the remaining balance in my account should be (\\d+)$")      
 public void assertRemainingBalance(int remaining) throws Throwable {        
 // Write code here that turns the phrase above into concrete actions        
 //throw new PendingException();         

Support Code
The next step is to back your step definitions with support code. You can for example do a REST call to execute the step or do a database call or use a web driver like selenium . It is entirely up to the implementation. Once you get the response you can assert it with the results you are expecting or map it to your domain objects.
For example you can you selenium web driver to simulate logging into a site

protected WebDriver driver;         
public void setup() {        
 System.setProperty("", "C:\\devel\\projects\\cucumberworkshop\\chromedriver.exe");      
 driver = new ChromeDriver();        
@Given("^I open google$")        
public void I_open_google() throws Throwable {       
 driver.manage().timeouts().implicitlyWait(5, TimeUnit.SECONDS);         

Expressive Scenarios
Cucumber provides more options to organize your scenarios better.

  • Background– use this to define steps which are common to all scenarios
  • Data Tables– You can write the input data in table format
  • Scenario Outline-placeholder for your scenario which can be executed for a set of data called Example.
  • Tags and Sub Folders to organize your features-Tags are more like sticky notes for documentation.

Dependency Injection
More often than not you might have to pass the information created in one step to another. For example you create a domain object in your first step and then you need to use it in your second step. The clean way to achieve this is through Dependency Injection . Cucumber provides modules for the main DI containers like Spring, Guice, Pico etc.

Executing Cucumber
It is very easy to run Cucumber on IntelliJ IDE . It can be also integrated with your build system. You can also control the tests you want to run with different options.

Reporting Options
There are lot of plugins available for reporting . For example you could use the Master Thought plugin for the reports.

The Cucumber for Java book– This is an excellent book and this is all you need to get you started
GitHub link
That’s all folks. Hope you liked it. Have a good Christmas! Enjoy.

How to write a java agent

For vmlens, a lightweight java race condition catcher, we are using a java agent to trace field accesses. Here are the lessons we learned implementing such an agent.

The Start

Create an agent class with a “static public static void premain(String args, Instrumentation inst)” method. Put this class into a jar file with a manifest pointing to the Agent class. The premain method will be called before the main method of the application.

Manifest-Version: 1.0
Ant-Version: Apache Ant 1.9.2
Created-By: 1.8.0_05-b13 (Oracle Corporation)
Built-By: Thomas Krieger
Implementation-Vendor: Anarsoft
Implementation-Title: VMLens Agent
Can-Retransform-Classes: true
Premain-Class: com.anarsoft.trace.agent.Agent
Boot-Class-Path: agent_bootstrap.jar

The MANIFEST.MF file from vmlens.

Class loader magic part 1

The agent class will be loaded by the system class loader. But we have to avoid version conflicts between the classes used by the agent and the application. Especially the frameworks used in the agent should not be visible to the application classes. So we use a dedicated URLClassLoader to load all other agent classes:

// remember the currently used classloader
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
// Create and set a special URLClassLoader
URLClassLoader classloader = new URLClassLoader(urlList.toArray(new URL[]{}) , null );
// Load and execute the agent
String agentName = "com.anarsoft.trace.agent.runtime.AgentRuntimeImpl";
AgentRuntime agentRuntime  =  (AgentRuntime) classloader.loadClass(agentName).newInstance();
// reset the classloader

Class loader magic part 2

Now we use asm to add our static callbacks methods when a field is accessed. To make sure that the classes are visible in every other class, they have to be loaded by the bootstrap classloader. To do this they have to be in a java package and the jar containing them have to be in the boot class path.

package java.anarsoft.trace.agent.bootstrap.callback;

public class FieldAccessCallback {

public static  void getStaticField(int field,int methodId) {


A callback class from vmlens. It has to be in the java package namespace to be visible in all classes.

Boot-Class-Path: agent_bootstrap.jar

The boot class path entry in the MANIFEST.MF file from vmlens.

VMLens, a lightweight java race condition catcher, is built as a java agent. We know, writing java agents can be a tricky business. So, if you have any questions, just ask them in a comment below.

Functional Data Structures in Java 8 with Javaslang

Java 8’s lambdas (λ) empower us to create wonderful API’s. They incredibly increase the expressiveness of the language.

Javaslang leveraged lambdas to create various new features based on functional patterns. One of them is a functional collection library that is intended to be a replacement for Java’s standard collections.

Javaslang Collections

(This is just a bird’s view, you will find a human-readable version below.)

Functional Programming

Before we deep-dive into the details about the data structures I want to talk about some basics. This will make it clear why I created Javaslang and specifically new Java collections.


Java applications are typically plentiful of side-effects. They mutate some sort of state, maybe the outer world. Common side effects are changing objects or variables in place, printing to the console, writing to a log file or to a database. Side-effects are considered harmful if they affect the semantics of our program in an undesirable way.

For example, if a function throws an exception and this exception is interpreted, it is considered as side-effect that affects our program. Furthermore exceptions are like non-local goto-statements. They break the normal control-flow. However, real-world applications do perform side-effects.

int divide(int dividend, int divisor) {
    // throws if divisor is zero
    return dividend / divisor;

In a functional setting we are in the favorable situation to encapsulate the side-effect in a Try:

// = Success(result) or Failure(exception)
Try<Integer> divide(Integer dividend, Integer divisor) {
    return Try.of(() -> dividend / divisor);

This version of divide does not throw any more. We made the possible failure explicit by using the type Try.

Mario Fusco on Functional Programming

Referential Transparency

A function, or more general an expression, is called referential transparent if a call can be replaced by its value without affecting the behavior of the program. Simply spoken, given the same input the output is always the same.

// not referential transparent

// referential transparent
Math.max(1, 2);

A function is called pure if all expressions involved are referential transparent. An application composed of pure functions will most probably just work if it compiles. We are able to reason about it. Unit tests are easy to write and debugging becomes a relict of the past.

Thinking in Values

Rich Hickey, the creator of Clojure, gave a great talk about The Value of Values. The most interesting values are immutable values. The main reason is that immutable values

  • are inherently thread-safe and hence do not need to be synchronized
  • are stable regarding equals and hashCode and thus are reliable hash keys
  • do not need to be cloned
  • behave type-safe when used in unchecked covariant casts (Java-specific)

The key to a better Java is to use immutable values paired with referential transparent functions.

Javaslang provides the necessary controls and collections to accomplish this goal in every-day Java programming.

Data Structures in a Nutshell

Javaslang’s collection library comprises of a rich set of functional data structures built on top of lambdas. The only interface they share with Java’s original collections is Iterable. The main reason is that the mutator methods of Java’s collection interfaces do not return an object of the underlying collection type.

We will see why this is so essential by taking a look at the different types of data structures.

Mutable Data Structures

Java is an object-oriented programming language. We encapsulate state in objects to achieve data hiding and provide mutator methods to control the state. The Java collections framework (JCF) is built upon this idea.

interface Collection<E> {
    // removes all elements from this collection
    void clear();

Today I comprehend a void return type as a smell. It is evidence that side-effects take place, state is mutated. Shared mutable state is an important source of failure, not only in a concurrent setting.

Immutable Data Structures

Immutable data structures cannot be modified after their creation. In the context of Java they are widely used in the form of collection wrappers.

List<String> list = Collections.unmodifiableList(otherList);

// Boom!
list.add("why not?");

There are various libraries that provide us with similar utility methods. The result is always an unmodifiable view of the specific collection. Typically it will throw at runtime when we call a mutator method.

Persistent Data Structures

A persistent data structure does preserve the previous version of itself when being modified and is therefore effectively immutable. Fully persistent data structures allow both updates and queries on any version.

Many operations perform only small changes. Just copying the previous version wouldn’t be efficient. To save time and memory, it is crucial to identify similarities between two versions and share as much data as possible.

This model does not impose any implementation details. Here come functional data structures into play.

Functional Data Structures

Also known as purely functional data structures, these are immutable and persistent. The methods of functional data structures are referential transparent.

Javaslang features a wide range of the most-commonly used functional data structures. The following examples are explained in-depth.

Linked List

One of the most popular and also simplest functional data structures is the (singly) linked List. It has a head element and a tail List. A linked List behaves like a Stack which follows the last in, first out (LIFO) method.

In Javaslang we instantiate a List like this:

// = List(1, 2, 3)
List<Integer> list1 = List.of(1, 2, 3);

Each of the List elements forms a separate List node. The tail of the last element is Nil, the empty List.

List 1

This enables us to share elements across different versions of the List.

// = List(0, 2, 3)
List<Integer> list2 = list1.tail().prepend(0);

The new head element 0 is linked to the tail of the original List. The original List remains unmodified.

List 2

These operations take place in constant time, in other words they are independent of the List size. Most of the other operations take linear time. In Javaslang this is expressed by the interface LinearSeq, which we may already know from Scala.

If we need data structures that are queryable in constant time, Javaslang offers Array and Vector. Both have random access capabilities.

The Array type is backed by a Java array of objects. Insert and remove operations take linear time. Vector is in-between Array and List. It performs well in both areas, random access and modification.

In fact the linked List can also be used to implement a Queue data structure.


A very efficient functional Queue can be implemented based on two linked Lists. The front List holds the elements that are dequeued, the rear List holds the elements that are enqueued. Both operations enqueue and dequeue perform in O(1).

Queue<Integer> queue = Queue.of(1, 2, 3)

The initial Queue is created of three elements. Two elements are enqueued on the rear List.

Queue 1

If the front List runs out of elements when dequeueing, the rear List is reversed and becomes the new front List.

Queue 2

When dequeueing an element we get a pair of the first element and the remaining Queue. It is necessary to return the new version of the Queue because functional data structures are immutable and persistent. The original Queue is not affected.

Queue<Integer> queue = Queue.of(1, 2, 3);

// = (1, Queue(2, 3))
Tuple2<Integer, Queue<Integer>> dequeued =

What happens when the Queue is empty? Then dequeue() will throw a NoSuchElementException. To do it the functional way we would rather expect an optional result.

// = Some((1, Queue()))

// = None

An optional result may be further processed, regardless if it is empty or not.

// = Queue(1)
Queue<Integer> queue = Queue.of(1);

// = Some((1, Queue()))
Option<Tuple2<Integer, Queue<Integer>>>
        dequeued = queue.dequeueOption();

// = Some(1)
Option<Integer> element =;

// = Some(Queue())
Option<Queue<Integer>> remaining =;

Sorted Set

Sorted Sets are data structures that are more frequently used than Queues. We use binary search trees to model them in a functional way. These trees consist of nodes with up to two children and values at each node.

We build binary search trees in the presence of an ordering, represented by an element Comparator. All values of the left subtree of any given node are strictly less than the value of the given node. All values of the right subtree are strictly greater.

// = TreeSet(1, 2, 3, 4, 6, 7, 8)
SortedSet<Integer> xs =
        TreeSet.of(6, 1, 3, 2, 4, 7, 8);

Binary Tree 1

Searches on such trees run in O(log n) time. We start the search at the root and decide if we found the element. Because of the total ordering of the values we know where to search next, in the left or in the right branch of the current tree.

// = TreeSet(1, 2, 3);
SortedSet<Integer> set = TreeSet.of(2, 3, 1, 2);

// = TreeSet(3, 2, 1);
Comparator<Integer> c = (a, b) -> b - a;
SortedSet<Integer> reversed =
        TreeSet.of(c, 2, 3, 1, 2);

Most tree operations are inherently recursive. The insert function behaves similar to the search function. When the end of a search path is reached, a new node is created and the whole path is reconstructed up to the root. Existing child nodes are referenced whenever possible. Hence the insert operation takes O(log n) time and space.

// = TreeSet(1, 2, 3, 4, 5, 6, 7, 8)
SortedSet<Integer> ys = xs.add(5);

Binary Tree 2

In order to maintain the performance characteristics of a binary search tree it needs to be kept balanced. All paths from the root to a leaf need to have roughly the same length.

In Javaslang we implemented a binary search tree based on a Red/Black Tree. It uses a specific coloring strategy to keep the tree balanced on inserts and deletes. To read more about this topic please refer to the book Purely Functional Data Structures by Chris Okasaki.

State of the Collections

Generally we are observing a convergence of programming languages. Good features make it, other disappear. But Java is different, it is bound forever to be backward compatible. That is a strength but also slows down evolution.

Lambda brought Java and Scala closer together, yet they are still so different. Martin Odersky, the creator of Scala, recently mentioned in his BDSBTB 2015 keynote the state of the Java 8 collections.

He described Java’s Stream as a fancy form of an Iterator. The Java 8 Stream API is an example of a lifted collection. What it does is to define a computation and link it to a specific collection in another excplicit step.

// i + 1

This is how the new Java 8 Stream API works. It is a computational layer above the well known Java collections.

// = ["1", "2", "3"] in Java 8
Arrays.asList(1, 2, 3)

Javaslang is greatly inspired by Scala. This is how the above example should have been in Java 8.

// = Stream("1", "2", "3") in Javaslang
Stream.of(1, 2, 3).map(Object::toString)

Within the last year we put much effort into implementing the Javaslang collection library. It comprises the most widely used collection types.


We started our journey by implementing sequential types. We already described the linked List above. Stream, a lazy linked List, followed. It allows us to process possibly infinite long sequences of elements.


All collections are Iterable and hence could be used in enhanced for-statements.

for (String s : List.of("Java", "Advent")) {
    // side effects and mutation

We could accomplish the same by internalizing the loop and injecting the behavior using a lambda.

List.of("Java", "Advent").forEach(s -> {
    // side effects and mutation

Anyway, as we previously saw we prefer expressions that return a value over statements that return nothing. By looking at a simple example, soon we will recognize that statements add noise and divide what belongs together.

String join(String... words) {
    StringBuilder builder = new StringBuilder();
    for(String s : words) {
        if (builder.length() > 0) {
            builder.append(", ");
    return builder.toString();

The Javaslang collections provide us with many functions to operate on the underlying elements. This allows us to express things in a very concise way.

String join(String... words) {
    return List.of(words)
               .intersperse(", ")
               .fold("", String::concat);

Most goals can be accomplished in various ways using Javaslang. Here we reduced the whole method body to fluent function calls on a List instance. We could even remove the whole method and directly use our List to obtain the computation result.

List.of(words).mkString(", ");

In a real world application we are now able to drastically reduce the number of lines of code and hence lower the risk of bugs.

Set and Map

Sequences are great. But to be complete, a collection library also needs different types of Sets and Maps.

Set and Map

We described how to model sorted Sets with binary tree structures. A sorted Map is nothing else than a sorted Set containing key-value pairs and having an ordering for the keys.

The HashMap implementation is backed by a Hash Array Mapped Trie (HAMT). Accordingly the HashSet is backed by a HAMT containing key-key pairs.

Our Map does not have a special Entry type to represent key-value pairs. Instead we use Tuple2 which is already part of Javaslang. The fields of a Tuple are enumerated.

// = (1, "A")
Tuple2<Integer, String> entry = Tuple.of(1, "A");

Integer key = entry._1;
String value = entry._2;

Maps and Tuples are used throughout Javaslang. Tuples are inevitable to handle multi-valued return types in a general way.

// = HashMap((0, List(2, 4)), (1, List(1, 3)))
List.of(1, 2, 3, 4).groupBy(i -> i % 2);

// = List((a, 0), (b, 1), (c, 2))
List.of('a', 'b', 'c').zipWithIndex();

At Javaslang, we explore and test our library by implementing the 99 Euler Problems. It is a great proof of concept. Please don’t hesitate to send pull requests.

Hands On!

I really hope this article has sparked your interest in Javaslang. Even if you do use Java 7 (or below) at work, as I do, it is possible to follow the idea of functional programming. It will be of great good!

Please make sure Javaslang is part of your toolbelt in 2016.

Happy hacking!

PS: question ? @_Javaslang or Gitter chat