Thursday, December 15, 2011

Java 7: Fork and join decomposable input pattern

In my recent blog I have introduced the fork and join framework of Java 7. This blog presents a little framework on top of raw fork and join. The framework implements the decomposable input pattern (dip) - which originated from my own laziness when I was using the framework a couple of times. I have realized that I was writing the same code every time when I was implementing a slightly different use case. And you know, let's write a little peace of software that I can reuse. The decomposable input pattern framwork was born.

You can download the binary here.
The API-documentation is hosted here.
And the sources are also available here.

Now what's different when you use that framework? I'd say the difference is that the dip-framework follows good OO design principles, like the open-closed-principle that says: "A module should be open for extension but closed for modification." In other words I have seperated concerns in a fork and join scenario to make the whole more flexible and easy to change.

In my last blog I presented a code snippet that illustrated how to use plain fork and join to calculate offers of car insurances. Let's see how this can be done using my dip-framwork.

The input to the proposal calculation is - well - a list of proposals :-) In the dip framework you wrap the input of a ForkJoinTask into a subclass of DecomposableInput. The name originates from the fact that input to ForkJoinTask is decomposable. Here is the snippet:



The class wraps the raw input to ForkJoinTask and provides a method how that input can be decomposed. Also, it provides a method computeDirectly() that can decide on whether this input needs further decomposition to be small enough for direct computation.

The output of proposal calculation is a list of maps of prices. If you have four input proposals, you'll get a list of four maps with various prices. In the dip framework, you wrap the output into a subclass of ComposableResult.



The class implements the compose method that can compose an atomic result of a computation into the existing raw result. It returns a ComposableResult instance that holds the new composition.

I agree it's a little abtsract. Not only that concurrency is inherently complex. I am also putting another abstraction onto it. But once you've used the framework you'll realize the strength. So stay tuned, we're almost finnished :-)

Now, you have an input and an ouptut and the last thing you need is a computation object. In my example that's the pricing engine. To connect the pricing engine to the dip framework, you'll need to implement a subclass of ComputationActivityBridge.



The PricingEngineBridge implements the compute method that calls the pricing engine. It translates the DecomposableInput into an input that the pricing engine accepts. And it creates an instance of ComposableResult that contains the output of the pricing engine.

Last thing to do is to get the stuff started.



The example creates an instance of GenericRecursiveTask and passes the ListOfProposals as well as the PricingEngineBrige as input. If you pass that to the ForkJoinPool then you receive an instance of ListOfPrices as ouput.

What's the advantage when you use the dip-framework? For instance:

- you could pass arbitrary processing input to GenericRecursiveTask by implementing a subclass of DecomposableInput
- you could implement your own custom RecursiveTask the same way I have implemented GenericRecursiveTask and pass the proposals and the PricingEngineBridge to that task
- you could implement a custom ForkAndJoinProcessor and use that by subclassing GenericRecursiveTask: that way you can control the creation of subtask and their distribution across threads
- you could exchange the processing activity (here: PricingEngineBridge) by implementing a custom ComputationActivityBridge and try alternative pricing engines or make something completely different then calculating prices ...

I think I have made my point: the whole is closed for modification, but open for extention now.

The complete example code is here in my git repository.

Let me know if you like it. Looking forward to critical and enjoyable comments.

Cheers, Niklas

5 comments:

  1. Hi Niklas,
    looks like a Map-Reduce to me. Or in which way does the dip differ?

    Greetings
    Johannes

    ReplyDelete
  2. Hi Joshi, thx for the comment. MapReduce is a framework for writing applications which process data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

    Dip is just an abstraction of Java 7 fork and join to distribute tasks efficiently on a multicore machine. They both split input data into smaller chunks to distribute workload.

    Cheers, Niklas

    ReplyDelete
  3. Hi Niklas,

    Nice blog! Is there an email address I can contact you in private?

    ReplyDelete
  4. Fork-join framework is one of several improvements made in Java7 e.g. automatic resource management, string in switch etc, this can certainly boost the performance of Java app by taking advantage of multi-core architecture coming on new generation cpu.

    ReplyDelete
  5. I've work under this pattern, but it has a big fail. testing it on demand, with slow processors has a big overhead creating objects on demand.... i came out with a simple aproach...: http://alacret.blogspot.com/2011/12/java-7-forkjoin-example-implementing.html using object pools and plain implementation of the object that do the calculations

    ReplyDelete