Which Java synchronization construct is likely to provide the best
performance for a concurrent, iterative processing scenario with a
fixed number of threads like the one outlined below? After experimenting
on my own for a while (using ExecutorService and CyclicBarrier) and
being somewhat surprised by the results, I would be grateful for some
expert advice and maybe some new ideas. Existing questions here do
not seem to focus primarily on performance, hence this new one.
Thanks in advance!
The core of the app is a simple iterative data processing algorithm,
parallelized to the spread the computational load across 8 cores on
a Mac Pro, running OS X 10.6 and Java 1.6.0_07. The data to be processed
is split into 8 blocks and each block is fed to a Runnable to be executed
by one of a fixed number of threads. Parallelizing the algorithm was
fairly straightforward, and it functionally works as desired, but
its performance is not yet what I think it could be. The app seems
to spend a lot of time in system calls synchronizing, so after some
profiling I wonder whether I selected the most appropriate
synchronization mechanism(s).
A key requirement of the algorithm is that it needs to proceed in
stages, so the threads need to sync up at the end of each stage.
The main thread prepares the work (very low overhead), passes it to
the threads, lets them work on it, then proceeds when all threads
are done, rearranges the work (again very low overhead) and repeats
the cycle. The machine is dedicated to this task, Garbage Collection
is minimized by using per-thread pools of pre-allocated items, and
the number of threads can be fixed (no incoming requests or the like,
just one thread per CPU core).
V1 - ExecutorService
My first implementation used an ExecutorService with 8 worker
threads. The program creates 8 tasks holding the work and then
lets them work on it, roughly like this:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
This works well functionally (it does what it should), and for
very large work items indeed all 8 CPUs become highly loaded, as
much as the processing algorithm would be expected to allow (some
work items will finish faster than others, then idle). However,
as the work items become smaller (and this is not really under
the program's control), the user CPU load shrinks dramatically:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
Legend:
- block size = size of the work item (= computational steps)
- system = system load, as shown in OS X Activity Monitor (red bar)
- user = user load, as shown in OS X Activity Monitor (green bar)
- cycles/sec = iterations through the main while loop, more is better
The primary area of concern here is the high percentage of time spent
in the system, which appears to be driven by thread synchronization
calls. As expected, for smaller work items, ExecutorService.invokeAll()
will require relatively more effort to sync up the threads
versus the amount of work being performed in each thread. But
since ExecutorService is more generic than it would need to be
for this use case (it can queue tasks for threads if there are
more tasks than cores), I though maybe there would be a leaner
synchronization construct.
V2 - CyclicBarrier
The next implementation used a CyclicBarrier to sync up
the threads before receiving work and after completing it,
roughly as follows:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Again, this works well functionally (it does what it should),
and for very large work items indeed all 8 CPUs become highly
loaded, as before. However, as the work items become smaller,
the load still shrinks dramatically:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
For large work items, synchronization is negligible and the
performance is identical to V1. But unexpectedly, the results
of the (highly specialized) CyclicBarrier seem MUCH WORSE than
those for the (generic) ExecutorService: throughput (cycles/sec)
is only about 1/4th of V1. A preliminary conclusion would be
that even though this seems to be the advertised ideal use
case for CyclicBarrier, it performs much worse than the
generic ExecutorService.
V3 - Wait/Notify + CyclicBarrier
It seemed worth a try to replace the first cyclic barrier await()
with a simple wait/notify mechanism:
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Again, this works well functionally (it does what it should).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
The throughput for small work items is still much worse than that
of the ExecutorService, but about 2x that of the CyclicBarrier.
Eliminating one CyclicBarrier eliminates half of the gap.
V4 - Busy wait instead of wait/notify
Since this app is the primary one running on the system and
the cores idle anyway if they're not busy with a work item,
why not try a busy wait for work items in each thread, even if
that spins the CPU needlessly. The worker thread code changes
as follows:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Also works well functionally (it does what it should).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
For small work items, this increases throughput by a further
10% over the CyclicBarrier + wait/notify variant, which is not
insignificant. But it is still much lower-throughput than V1
with the ExecutorService.
V5 - ?
So what is the best synchronization mechanism for such a
(presumably not uncommon) problem? I am weary of writing my
own sync mechanism to completely replace ExecutorService
(assuming that it is too generic and there has to be something
that can still be taken out to make it more efficient).
It is not my area of expertise and I'm concerned that I'd
spend a lot of time debugging it (since I'm not even sure
my wait/notify and busy wait variants are correct) for
uncertain gain.
Any advice would be greatly appreciated.