Factored out the database cleaner.

This commit is contained in:
akwizgran
2011-07-04 18:11:27 +01:00
parent 390b316724
commit eb752ada62
9 changed files with 242 additions and 49 deletions

View File

@@ -21,9 +21,16 @@ public interface DatabaseComponent {
static final long MAX_BYTES_BETWEEN_SPACE_CHECKS = 5L * MEGABYTES;
static final long MAX_MS_BETWEEN_SPACE_CHECKS = 60L * 1000L; // 1 min
static final long BYTES_PER_SWEEP = 5L * MEGABYTES;
static final int CLEANER_SLEEP_MS = 1000; // 1 sec
static final int MS_BETWEEN_SWEEPS = 1000; // 1 sec
static final int RETRANSMIT_THRESHOLD = 3;
/**
* Opens the database.
* @param resume True to reopen an existing database or false to create a
* new one.
*/
void open(boolean resume) throws DbException;
/** Waits for any open transactions to finish and closes the database. */
void close() throws DbException;

View File

@@ -0,0 +1,32 @@
package net.sf.briar.db;
import net.sf.briar.api.db.DbException;
interface DatabaseCleaner {
/**
* Starts a new thread to monitor the amount of free storage space
* available to the database and expire old messages as necessary.
*/
void startCleaning();
/** Tells the cleaner thread to exit and returns when it has done so. */
void stopCleaning();
interface Callback {
/**
* Checks how much free storage space is available to the database, and if
* necessary expires old messages until the free space is at least
* MIN_FREE_SPACE. While the free space is less than CRITICAL_FREE_SPACE,
* operations that attempt to store messages in the database will block.
*/
void checkFreeSpaceAndClean() throws DbException;
/**
* Called by the cleaner; returns true iff the amount of free storage space
* available to the database should be checked.
*/
boolean shouldCheckFreeSpace();
}
}

View File

@@ -0,0 +1,54 @@
package net.sf.briar.db;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.inject.Inject;
public class DatabaseCleanerImpl implements DatabaseCleaner, Runnable {
private final Callback db;
private final int msBetweenSweeps;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final Thread cleanerThread = new Thread(this);
@Inject
DatabaseCleanerImpl(Callback db, int msBetweenSweeps) {
this.db = db;
this.msBetweenSweeps = msBetweenSweeps;
}
public void startCleaning() {
cleanerThread.start();
}
public void stopCleaning() {
stopped.set(true);
// If the cleaner thread is waiting, wake it up
synchronized(stopped) {
stopped.notifyAll();
}
try {
cleanerThread.join();
} catch(InterruptedException ignored) {}
}
public void run() {
try {
while(!stopped.get()) {
if(db.shouldCheckFreeSpace()) {
db.checkFreeSpaceAndClean();
} else {
synchronized(stopped) {
try {
stopped.wait(msBetweenSweeps);
} catch(InterruptedException ignored) {}
}
}
}
} catch(Throwable t) {
// FIXME: Work out what to do here
t.printStackTrace();
System.exit(1);
}
}
}

View File

@@ -19,12 +19,14 @@ import com.google.inject.Provider;
* Abstract superclass containing code shared by ReadWriteLockDatabaseComponent
* and SynchronizedDatabaseComponent.
*/
abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent,
DatabaseCleaner.Callback {
private static final Logger LOG =
Logger.getLogger(DatabaseComponentImpl.class.getName());
protected final Database<Txn> db;
protected final DatabaseCleaner cleaner;
protected final Provider<Batch> batchProvider;
private final Object spaceLock = new Object();
@@ -33,10 +35,16 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
private long timeOfLastCheck = 0L; // Locking: spaceLock
private volatile boolean writesAllowed = true;
DatabaseComponentImpl(Database<Txn> db, Provider<Batch> batchProvider) {
DatabaseComponentImpl(Database<Txn> db, DatabaseCleaner cleaner,
Provider<Batch> batchProvider) {
this.db = db;
this.cleaner = cleaner;
this.batchProvider = batchProvider;
startCleaner();
}
public void open(boolean resume) throws DbException {
db.open(resume);
cleaner.startCleaning();
}
/**
@@ -63,13 +71,7 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
return sendability;
}
/**
* Checks how much free storage space is available to the database, and if
* necessary expires old messages until the free space is at least
* MIN_FREE_SPACE. While the free space is less than CRITICAL_FREE_SPACE,
* operations that attempt to store messages in the database will block.
*/
private void checkFreeSpaceAndClean() throws DbException {
public void checkFreeSpaceAndClean() throws DbException {
long freeSpace = db.getFreeSpace();
while(freeSpace < MIN_FREE_SPACE) {
// If disk space is critical, disable the storage of new messages
@@ -123,11 +125,7 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
db.removeMessage(txn, id);
}
/**
* Returns true iff the amount of free storage space available to the
* database should be checked.
*/
private boolean shouldCheckFreeSpace() {
public boolean shouldCheckFreeSpace() {
synchronized(spaceLock) {
long now = System.currentTimeMillis();
if(bytesStoredSinceLastCheck > MAX_BYTES_BETWEEN_SPACE_CHECKS) {
@@ -149,35 +147,6 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
return false;
}
/**
* Starts a new thread to monitor the amount of free storage space
* available to the database and expire old messages as necessary.
* <p>
* FIXME: The thread implementation should be factored out.
*/
private void startCleaner() {
Runnable cleaner = new Runnable() {
public void run() {
try {
while(true) {
if(shouldCheckFreeSpace()) {
checkFreeSpaceAndClean();
} else {
try {
Thread.sleep(CLEANER_SLEEP_MS);
} catch(InterruptedException ignored) {}
}
}
} catch(Throwable t) {
// FIXME: Work out what to do here
t.printStackTrace();
System.exit(1);
}
}
};
new Thread(cleaner).start();
}
/**
* If the given message is already in the database, marks it as seen by the
* sender and returns false. Otherwise stores the message, updates the

View File

@@ -47,12 +47,13 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
new ReentrantReadWriteLock(true);
@Inject
ReadWriteLockDatabaseComponent(Database<Txn> db,
ReadWriteLockDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner,
Provider<Batch> batchProvider) {
super(db, batchProvider);
super(db, cleaner, batchProvider);
}
public void close() throws DbException {
cleaner.stopCleaning();
contactLock.writeLock().lock();
try {
messageLock.writeLock().lock();

View File

@@ -41,12 +41,13 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
private final Object subscriptionLock = new Object();
@Inject
SynchronizedDatabaseComponent(Database<Txn> db,
SynchronizedDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner,
Provider<Batch> batchProvider) {
super(db, batchProvider);
super(db, cleaner, batchProvider);
}
public void close() throws DbException {
cleaner.stopCleaning();
synchronized(contactLock) {
synchronized(messageLock) {
synchronized(messageStatusLock) {

View File

@@ -13,6 +13,7 @@
<path refid='test-classes'/>
<path refid='util-classes'/>
</classpath>
<test name='net.sf.briar.db.DatabaseCleanerImplTest'/>
<test name='net.sf.briar.db.H2DatabaseTest'/>
<test name='net.sf.briar.i18n.FontManagerTest'/>
<test name='net.sf.briar.i18n.I18nTest'/>

View File

@@ -0,0 +1,45 @@
package net.sf.briar.db;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import net.sf.briar.api.db.DbException;
import net.sf.briar.db.DatabaseCleaner.Callback;
import org.junit.Test;
public class DatabaseCleanerImplTest extends TestCase {
@Test
public void testStoppingCleanerWakesItUp() throws DbException {
final CountDownLatch latch = new CountDownLatch(1);
Callback callback = new Callback() {
public void checkFreeSpaceAndClean() throws DbException {
throw new IllegalStateException();
}
public boolean shouldCheckFreeSpace() {
latch.countDown();
return false;
}
};
// Configure the cleaner to wait for 30 seconds between sweeps
DatabaseCleanerImpl cleaner =
new DatabaseCleanerImpl(callback, 30 * 1000);
long start = System.currentTimeMillis();
// Start the cleaner and check that shouldCheckFreeSpace() is called
cleaner.startCleaning();
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
} catch(InterruptedException e) {
assertTrue(false);
}
// Stop the cleaner (it should be waiting between sweeps)
cleaner.stopCleaning();
long end = System.currentTimeMillis();
// Check that much less than 30 seconds expired
assertTrue(end - start < 10 * 1000);
}
}

View File

@@ -7,6 +7,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.TestCase;
import net.sf.briar.TestUtils;
@@ -586,6 +587,88 @@ public class H2DatabaseTest extends TestCase {
context.assertIsSatisfied();
}
@Test
public void testCloseWaitsForCommit() throws DbException {
Mockery context = new Mockery();
MessageFactory messageFactory = context.mock(MessageFactory.class);
final AtomicBoolean transactionFinished = new AtomicBoolean(false);
final AtomicBoolean closed = new AtomicBoolean(false);
final AtomicBoolean error = new AtomicBoolean(false);
// Create a new database
final Database<Connection> db = open(false, messageFactory);
// Start a transaction
Connection txn = db.startTransaction();
// In another thread, close the database
Thread t = new Thread() {
public void run() {
try {
db.close();
closed.set(true);
if(!transactionFinished.get()) error.set(true);
} catch(DbException e) {
error.set(true);
}
}
};
t.start();
// Do whatever the transaction needs to do
try {
Thread.sleep(100);
} catch(InterruptedException ignored) {}
transactionFinished.set(true);
// Commit the transaction
db.commitTransaction(txn);
// The other thread should now terminate
try {
t.join(10000);
} catch(InterruptedException ignored) {}
assertTrue(closed.get());
// Check that the other thread didn't encounter an error
assertFalse(error.get());
}
@Test
public void testCloseWaitsForAbort() throws DbException {
Mockery context = new Mockery();
MessageFactory messageFactory = context.mock(MessageFactory.class);
final AtomicBoolean transactionFinished = new AtomicBoolean(false);
final AtomicBoolean closed = new AtomicBoolean(false);
final AtomicBoolean error = new AtomicBoolean(false);
// Create a new database
final Database<Connection> db = open(false, messageFactory);
// Start a transaction
Connection txn = db.startTransaction();
// In another thread, close the database
Thread t = new Thread() {
public void run() {
try {
db.close();
closed.set(true);
if(!transactionFinished.get()) error.set(true);
} catch(DbException e) {
error.set(true);
}
}
};
t.start();
// Do whatever the transaction needs to do
try {
Thread.sleep(100);
} catch(InterruptedException ignored) {}
transactionFinished.set(true);
// Abort the transaction
db.abortTransaction(txn);
// The other thread should now terminate
try {
t.join(10000);
} catch(InterruptedException ignored) {}
assertTrue(closed.get());
// Check that the other thread didn't encounter an error
assertFalse(error.get());
}
private Database<Connection> open(boolean resume,
MessageFactory messageFactory) throws DbException {
final char[] passwordArray = passwordString.toCharArray();