Composing Multiple Async Results via an Applicative Builder in Java 8

A few months ago, I put out a publication where I explain in detail an abstraction I came up with named Outcome, which helped me A LOT to code without side-effects by enforcing the use of semantics. By following this simple (and yet powerful) convention, I ended up turning any kind of failure (a.k.a. Exception) into an explicit result from a function, making everything much easier to reason about. I don’t know you but I was tired of dealing with exceptions that teared everything down, so I did something about it, and to be honest, it worked really well. So before I keep going with my tales from the trenches, I really recommend going over that post. Now let’s solve some asynchronous issues by using eccentric applicative ideas, shall we?

Something wicked this way comes

Life was real good, our coding was fast-paced,  cleaner and composable as ever, but, out of the blue, we stumble upon a “missing” feature (evil laughs please): we needed to combine several asynchronous Outcome instances in a non-blocking fashion….

ohgodwhy

Excited by the idea, I got down to work. I experimented for a fair amount of time seeking for a robust and yet simple way of expressing these kind of situations; while the new ComposableFuture API turned out to be much nicer that I expected (though I still don’t understand why they decided to use names like applyAsync  or thenComposeAsync instead of map or flatMap), I always ended up with implementations too verbose and repetitive comparing to some stuff I did with Scala, but after some long “Mate” sessions, I had my “Hey! moment”: Why not using something similar to an applicative?

The problem

Suppose that we have these two asynchronous results

and a silly entity called Message

I need something that given textf and numberf it will give me back something like

//After combining textf and numberf
CompletableFuture<Outcome<Message>> message = ....

So I wrote a letter to Santa Claus:

  1. I want to asynchronously format the string returned by textf using the number returned by numberf only when both values are available, meaning that both futures completed successfully and none of the outcomes did fail. Of course, we need to be non-blocking.
  2. In case of failures, I want to collect all failures that took place during the execution of textf and/or numberf and return them to the caller, again, without blocking at all.
  3. I don’t want to be constrained by the number of values to be combined,  it must be capable of handling a fair amount of asynchronous results. Did I say without blocking? There you go…
  4. Not die during the attempt.

waaat

Applicative  builder to the rescue

If you think about it, one simple way to put what we’re trying to achieve is as follows:

// Given a String -> Given a number -> Format the message
f: String -> Integer -> Message

Checking the definition of  f, it is saying something like: “Given a String, I will return a function that takes an Integer as parameter, that when applied, will return an instance of type Message“, this way, instead of waiting for all values to be available at once, we can partially apply one value at a time, getting an actual description of the construction process of a Message instance. That sounded great.

To achieve that, it would be really awesome if we could take the construction lambda Message:new and curry it, boom!, done!, but in Java that’s impossible (to do in a generic, beautiful and concise way), so for the sake of our example, I decided to go with our beloved Builder pattern, which kinda does the job:

And here’s the WannabeApplicative<T> definition

public interface WannabeApplicative<V>
{
    V apply();
}

Disclamer: For those functional freaks out there, this is not an applicative per se, I’m aware of that, but I took some ideas from it an adapted them according to the tools that the language offered me out of the box. So, if you’re feeling curious, go check this post for a more formal example.

If you’re still with me, we could agree that we’ve done nothing too complicated so far, but now we need to express a building step, which, remember, needs to be non-blocking and capable to combine any previous failure that might have took place in other executions with potentially new ones. So, in order to do that, I came up with something as follows:

First of all, we’ve got two functional interfaces: one is Partial<B>, which represents a lazy application of a value to a builder, and the second one, MergingStage<B,V>, represents the “how” to combine both the builder and the value. Then, we’ve got a method called value that, given an instance of type CompletableFuture<Outcome<V>>, it will return an instance of type MergingStage<B,V>, and believe or not, here’s where the magic takes place. If you remember the MergingState definition, you’ll see it’s a BiFunction, where the first parameter is of type Outcome<B> and the second one is of type Outcome<V>. Now, if you follow the types, you can tell that we’ve got two things: the partial state of the building process on one side (type parameter B)  and a new value that need to be applied to the current state of the builder (type parameter V), so that, when applied, it will generate a new builder instance with the “next state in the building sequence”, which is represented by Partial<B>. Last but not least, we’ve got the stickedTo method, which basically is a (awful java) hack to stick to a specific applicative type (builder) while defining building step. For instance, having:

I can define partial value applications to any Builder instance as follows:

See that we haven’t built anything yet, we just described what we want to do with each value when the time comes, we might want to perform some validations before using the new value (here’s when Outcome plays an important role) or just use it as it is, it’s really up to us, but the main point is that we haven’t applied anything yet. In order to do so, and to finally tight up all loose ends, I came up with some other definition, which looks as follows:

Hope it’s not that overwhelming, but I’ll try to break it down as clearer as possible. In order to start specifying how you’re going to combine the whole thing together, you will start by calling begin with an instance of type WannabeApplicative<V>, which, in our case, type parameter V is equal to Builder.

FutureCompositions<Message, Builder> ab = begin(Message.applicative())

See that, after you invoke begin, you will get a new instance of FutureCompositions with a lazily evaluated partial state inside of it, making it the one and only owner of the whole building process state, and that was the ultimate goal of everything we’ve done so far, to fully gain control over when and how things will be combined. Next, we must specify the values that we want to combine, and that’s what the binding method is for:

ab.binding(textToApply)
  .binding(numberToApply);

This is how we supply our builder instance with all the values that need to be merged together along with the specification of what’s supposed to happen with each one of them, by using our previously defined Partial instances. Also see that everything’s still lazy evaluated, nothing has happened yet, but still we stacked all “steps” until we finally decide to materialize the result, which will happen when you call perform.

CompletableFuture<Outcome<Message>> message = ab.perform();

From that very moment everything will unfold,  each building stage will get evaluated, where failures could be returned and collected within an Outcome instance or simply the newly available values will be supplied to the target builder instance, one way or the other, all steps will be executed until nothing’s to be done. I will try to depict what just happened as follows

applicative

If you pay attention to the left side of the picture, you can easily see how each step gets “defined” as I showed before, following the previous “declaration” arrow direction, meaning, how you actually described the building process. Now, from the moment that you call perform, each applicative instance (remember Builder in our case) will be lazily evaluated in the opposite direction:  it will start by evaluating the last specified stage in the stack, which will then proceed to evaluate the next one and so forth up to the point where we reach the “beginning” of the building definition, where it will start to unfold o roll out evaluation each step up to the top, collecting everything  it can by using the MergingStage specification.

And this is just the beginning….

I’m sure a lot could be done to improve this idea, for example:

  • The two consecutive calls to dependingOn at CompositionSources.values() sucks, too verbose to my taste, I must do something about it.
  • I’m not quite sure to keep passing Outcome instances to a MergingStage, it would look cleaner and easier if we unwrap the values to be merged before invoking it and just return Either<Failure,V> instead – this will reduce complexity and increase flexibility on what’s supposed to happen behind the scenes.
  • Though using the Builder pattern did the job, it feels old-school, I would love to easily curry constructors, so in my to-do list is to check if jOOλ or Javaslang have something to offer on that matter.
  • Better type inference so that the any unnecessary noise gets remove from the code, for example, the stickedTo method, it really is a code smell, something that I hated from the first place. Definitely need more time to figure out an alternative way to infer the applicative type from the definition itself.

You’re more than welcome to send me any suggestions and comments you might have. Cheers and remember…..

index

@gszeliga

Sources

 

Applying ForkJoin – from optimal to fast

JDK 7 is well into the hands of developers by now and most people have heard of ForkJoin, yet not so many have the time or chance in daily work to try it.

It caused, and probably still causes a bit of confusion on how is it any different than a normal thread pool.[1]

My goal in this article is to present a more elaborate, yet still simple example of ForkJoin usage through a code example.

I time and measure the performance of a Serial vs a Thread pool vs a ForkJoin aproach.

Here is the github upfront : https://github.com/fbunau/javaadvent-forkjoin

Practical problem

Imagine we have some sort of component in our system that keeps the last price of a stock for every millisecond of time.

This could be held in memory as an array of integers. (if we count in bps)

The clients of this component make queries like : what is the moment of time between time1 and time2 when the price was the lowest?

This could either be an automated algorithm or just someone in a GUI making rectangle selections.

7 queries in the example image

Let us also imagine that we get many such queries from a client batched up in a Task.

They may be batched up for reducing network traffic and round trip time.
We have different sizes of tasks that the component might get, up to 10 queries (someone with a GUI), up to 100, .. up to 1 000 0000 (some automated algorithm). We have many such clients for the component each producing tasks of different sizes. See Task.TaskType

Core problem and solution

The problem at it’s core we have to solve is the RMQ problem. Here is Wikipedia on it [2]:

“Given an array of objects taken from a well-ordered set (such as numbers), a Range Minimum Query (or RMQ) from i to j asks for the position of a minimum element in the sub-array A[i, j].”

“For example, A = [0, 5, 2, 5, 4, 3, 1, 6, 3] when , then the answer to the range minimum query for the A[3, 8] = [2, 5, 4, 3, 1, 6] is 7, as A[7] = 1 .”

There exists an efficient datastructure for solving this problem called “Segment Tree”.

I won’t go into detail on this as it is excellently covered in this classic Topcoder article [3]. This in itself is not important for this ForkJoin example, I have chosen this because it’s more interesting than a simple sum and it’s essence is kind of in the spirit of fork-join. It divides the task to be computed and then it join the results!

The data structure has O(n) initialization time and O(log N) query time, where N is the number of elements in our price per time unit value array.
So a task T contains M such queries to be made.

In an academic Computer Science approach you would just say that we’ll process each task with this efficient data structure and the complexity will be :

You can’t get more efficient than that !? Yes, in a theoretical von Neumann machine, but you can in practice.

An easy confusion to make is that because O(n/4) == O(n), then when writing a program constant factors don’t count, but they do!
Stop and think, is it the same to wait 10 or 40 minutes / hours / years ?

Going parallel

So thinking on the problem to be solved, how can we make it faster? Since every computing device now has more cores for computations, let’s put them to good use and do more things at once.
We can easily do that using the Fork Join framework.

I was first tempted to tweek the RMQ data structure and execute it’s operations in parallel. I attacked something that was already log N. But it was a big failure, it’s too much overhead for the scheduler to micromanage such short running logic.

The answer was in the end attack the M_i constant factor.

Thread pool

Before presenting how a ForkJoin solution might be applied, let’s imagine how we might apply a thread pool. See : TaskProcessorPool.java

We can have a pool of 4 workers, when we have a Task to do, we add it to the queue. As soon as a worker is available, it will retrieve from the head of the queue a pending task, and execute it.

While this is fine for tasks having the same size, and the size is relatively medium and predictable, it runs into problems when the tasks to be executed are of different sizes. One worker might be choked up with a long running task, and the others sit doing nothing.

In this image the threadpool will do only 9 out of 16 possible units of work in the 4 units of time (56% efficiency), if no more tasks will be added to the queue

Fork Join

Fork join is useful when you are in a problem domain where you can split the task to be solved into smaller ones.

What is special about a fork-join pool is that it is a work-stealing thread pool.

Each worker thread maintains a local dequeue of tasks. When taking a new task for execution it can either :

  • split the task into smaller ones
  • execute the task if it’s small enough

When a thread has no local threads in it’s dequeue, it ‘steals’ , pops tasks from the back of the queue of another random thread, and puts it in his own. There is a high chance that this task is not yet split. So he’ll have quite some work on his hands.

Comparing to the thread pool, instead of the other threads waiting for some new work, they could split the existing task into smaller ones and help the other thread with that large task.

Here is the original paper by Doug Lea for a more detailed explanation : http://gee.cs.oswego.edu/dl/papers/fj.pdf

Coming back to our example a large batch of operations could be split into multiple batches of smaller number of operations. See : TaskProcessorFJ.java

Most problems have linear series of operations like this one, it doesn’t have to be a special parallel problem for which we need to apply a specialized parallel algorithm to leverage the cores we have on the processor.

How much do you split? You split a task until you reach a threshold where generally splitting makes no sense anymore. Ex : ( splitting + a thread getting a job + context switching is more than actually executing the task as it is )

For a big XXL, task we have to do 1000000 query operations. We could split this into 2 500000 operation tasks, and do that in parallel. Is 500000 still large? Yes, we can split it more. I have chosen a group of 10000 operations to be the threshold under which there is no use in splitting and we can just execute them on the current thread.

Fork join does not split all the tasks upfront, but rather as it works through it.

Performance results

I ran 4 iterations for each implementation of processor on my i5-2500 CPU @ 3.30GHz that has 4 cores / 4 threads, after a clean reboot
Here are the results :

Doing 4 runs for each of the 3 processors. Pls wait ...
TaskProcessorSimple: 7963
TaskProcessorSimple: 7757
TaskProcessorSimple: 7748
TaskProcessorSimple: 7744
TaskProcessorPool: 3933
TaskProcessorPool: 2906
TaskProcessorPool: 4477
TaskProcessorPool: 4160
TaskProcessorFJ: 2498
TaskProcessorFJ: 2498
TaskProcessorFJ: 2524
TaskProcessorFJ: 2511
Test completed.

Conclusions

Even if you have chosen the right optimal data structure, it’s not fast until you use all the resources you have. i.e. exploiting all cores

ForkJoin is definetly an improvement over the thread pool in certain problem domains and it’s worth exploring where it can be applied, and we’ll get to see more and more parallel code.
This is the kind of processor you can buy today 12 cores / 24 threads. Now we just have to write the software to exploit the cool hardware that we have and will get in the future.

The code is here : https://github.com/fbunau/javaadvent-forkjoin if you want to play with it

Thanks for your time, drop some comments if you see any errors or have things to add.

Meta: this post is part of the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!