How to unit test synchronized code
- by gillJ
Hi,
I am new to Java and junit. I have the following peice of code that I want to test. Would appreciate if you could send your ideas about what's the best way to go about testing it.
Basically, the following code is about electing a leader form a Cluster. The leader holds a lock on the shared cache and services of the leader get resumed and disposed if it somehow looses the lock on the cache.
How can i make sure that a leader/thread still holds the lock on the cache and that another thread cannot get its services resumed while the first is in execution?
public interface ContinuousService {
public void resume();
public void pause();
}
public abstract class ClusterServiceManager {
private volatile boolean leader = false;
private volatile boolean electable = true;
private List<ContinuousService> services;
protected synchronized void onElected() {
if (!leader) {
for (ContinuousService service : services) {
service.resume();
}
leader = true;
}
}
protected synchronized void onDeposed() {
if (leader) {
for (ContinuousService service : services) {
service.pause();
}
leader = false;
}
}
public void setServices(List<ContinuousService> services) {
this.services = services;
}
@ManagedAttribute
public boolean isElectable() {
return electable;
}
@ManagedAttribute
public boolean isLeader() {
return leader;
}
public class TangosolLeaderElector extends ClusterServiceManager implements Runnable {
private static final Logger log = LoggerFactory.getLogger(TangosolLeaderElector.class);
private String election;
private long electionWaitTime= 5000L;
private NamedCache cache;
public void start() {
log.info("Starting LeaderElector ({})",election);
Thread t = new Thread(this, "LeaderElector ("+election+")");
t.setDaemon(true);
t.start();
}
public void run() {
// Give the connection a chance to start itself up
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
boolean wasElectable = !isElectable();
while (true) {
if (isElectable()) {
if (!wasElectable) {
log.info("Leadership requested on election: {}",election);
wasElectable = isElectable();
}
boolean elected = false;
try {
// Try and get the lock on the LeaderElectorCache for the current election
if (!cache.lock(election, electionWaitTime)) {
// We didn't get the lock. cycle round again.
// This code to ensure we check the electable flag every now & then
continue;
}
elected = true;
log.info("Leadership taken on election: {}",election);
onElected();
// Wait here until the services fail in some way.
while (true) {
try {
Thread.sleep(electionWaitTime);
} catch (InterruptedException e) {}
if (!cache.lock(election, 0)) {
log.warn("Cache lock no longer held for election: {}", election);
break;
} else if (!isElectable()) {
log.warn("Node is no longer electable for election: {}", election);
break;
}
// We're fine - loop round and go back to sleep.
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Leadership election " + election + " failed (try bfmq logs for details)", e);
}
} finally {
if (elected) {
cache.unlock(election);
log.info("Leadership resigned on election: {}",election);
onDeposed();
}
// On deposition, do not try and get re-elected for at least the standard wait time.
try { Thread.sleep(electionWaitTime); } catch (InterruptedException e) {}
}
} else {
// Not electable - wait a bit and check again.
if (wasElectable) {
log.info("Leadership NOT requested on election ({}) - node not electable",election);
wasElectable = isElectable();
}
try {
Thread.sleep(electionWaitTime);
} catch (InterruptedException e) {}
}
}
}
public void setElection(String election) {
this.election = election;
}
@ManagedAttribute
public String getElection() {
return election;
}
public void setNamedCache(NamedCache nc) {
this.cache = nc;
}