Optimizing a thread safe Java NIO / Serialization / FIFO Queue [migrated]
- by trialcodr
I've written a thread safe, persistent FIFO for Serializable items. The reason for reinventing the wheel is that we simply can't afford any third party dependencies in this project and want to keep this really simple.
The problem is it isn't fast enough. Most of it is undoubtedly due to reading and writing directly to disk but I think we should be able to squeeze a bit more out of it anyway. Any ideas on how to improve the performance of the 'take'- and 'add'-methods?
/**
* <code>DiskQueue</code> Persistent, thread safe FIFO queue for
* <code>Serializable</code> items.
*/
public class DiskQueue<ItemT extends Serializable>
{
public static final int EMPTY_OFFS = -1;
public static final int LONG_SIZE = 8;
public static final int HEADER_SIZE = LONG_SIZE * 2;
private InputStream inputStream;
private OutputStream outputStream;
private RandomAccessFile file;
private FileChannel channel;
private long offs = EMPTY_OFFS;
private long size = 0;
public DiskQueue(String filename)
{
try
{
boolean fileExists = new File(filename).exists();
file = new RandomAccessFile(filename, "rwd");
if (fileExists)
{
size = file.readLong();
offs = file.readLong();
}
else
{
file.writeLong(size);
file.writeLong(offs);
}
} catch (FileNotFoundException e)
{
throw new RuntimeException(e);
} catch (IOException e)
{
throw new RuntimeException(e);
}
channel = file.getChannel();
inputStream = Channels.newInputStream(channel);
outputStream = Channels.newOutputStream(channel);
}
/**
* Add item to end of queue.
*/
public void add(ItemT item)
{
try
{
synchronized (this)
{
channel.position(channel.size());
ObjectOutputStream s = new ObjectOutputStream(outputStream);
s.writeObject(item);
s.flush();
size++;
file.seek(0);
file.writeLong(size);
if (offs == EMPTY_OFFS)
{
offs = HEADER_SIZE;
file.writeLong(offs);
}
notify();
}
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
/**
* Clears overhead by moving the remaining items up and shortening the file.
*/
public synchronized void defrag()
{
if (offs > HEADER_SIZE && size > 0)
{
try
{
long totalBytes = channel.size() - offs;
ByteBuffer buffer = ByteBuffer.allocateDirect((int) totalBytes);
channel.position(offs);
for (int bytes = 0; bytes < totalBytes;)
{
int res = channel.read(buffer);
if (res == -1)
{
throw new IOException("Failed to read data into buffer");
}
bytes += res;
}
channel.position(HEADER_SIZE);
buffer.flip();
for (int bytes = 0; bytes < totalBytes;)
{
int res = channel.write(buffer);
if (res == -1)
{
throw new IOException("Failed to write buffer to file");
}
bytes += res;
}
offs = HEADER_SIZE;
file.seek(LONG_SIZE);
file.writeLong(offs);
file.setLength(HEADER_SIZE + totalBytes);
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
/**
* Returns the queue overhead in bytes.
*/
public synchronized long overhead()
{
return (offs == EMPTY_OFFS) ? 0 : offs - HEADER_SIZE;
}
/**
* Returns the first item in the queue, blocks if queue is empty.
*/
public ItemT peek() throws InterruptedException
{
block();
synchronized (this)
{
if (offs != EMPTY_OFFS)
{
return readItem();
}
}
return peek();
}
/**
* Returns the number of remaining items in queue.
*/
public synchronized long size()
{
return size;
}
/**
* Removes and returns the first item in the queue, blocks if queue is empty.
*/
public ItemT take() throws InterruptedException
{
block();
try
{
synchronized (this)
{
if (offs != EMPTY_OFFS)
{
ItemT result = readItem();
size--;
offs = channel.position();
file.seek(0);
if (offs == channel.size())
{
truncate();
}
file.writeLong(size);
file.writeLong(offs);
return result;
}
}
return take();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
/**
* Throw away all items and reset the file.
*/
public synchronized void truncate()
{
try
{
offs = EMPTY_OFFS;
file.setLength(HEADER_SIZE);
size = 0;
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
/**
* Block until an item is available.
*/
protected void block() throws InterruptedException
{
while (offs == EMPTY_OFFS)
{
try
{
synchronized (this)
{
wait();
file.seek(LONG_SIZE);
offs = file.readLong();
}
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
/**
* Read and return item.
*/
@SuppressWarnings("unchecked")
protected ItemT readItem()
{
try
{
channel.position(offs);
return (ItemT) new ObjectInputStream(inputStream).readObject();
} catch (ClassNotFoundException e)
{
throw new RuntimeException(e);
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}