Compare commits

..

1 Commits

Author SHA1 Message Date
akwizgran
b4d0a221a3 Add executor with high and low priority queues. 2019-02-14 14:51:31 +00:00
11 changed files with 155 additions and 159 deletions

View File

@@ -25,7 +25,6 @@ import org.briarproject.bramble.api.transport.KeySetId;
import org.briarproject.bramble.api.transport.TransportKeys; import org.briarproject.bramble.api.transport.TransportKeys;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@@ -151,8 +150,6 @@ public interface DatabaseComponent {
boolean containsLocalAuthor(Transaction txn, AuthorId local) boolean containsLocalAuthor(Transaction txn, AuthorId local)
throws DbException; throws DbException;
int countFakes(Transaction txn, List<byte[]> ids) throws DbException;
/** /**
* Deletes the message with the given ID. Unlike * Deletes the message with the given ID. Unlike
* {@link #removeMessage(Transaction, MessageId)}, the message ID, * {@link #removeMessage(Transaction, MessageId)}, the message ID,

View File

@@ -0,0 +1,67 @@
package org.briarproject.bramble;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
@NotNullByDefault
public class PriorityExecutor {
private final Object lock = new Object();
private final Executor delegate, high, low;
@GuardedBy("lock")
private final Queue<Runnable> highQueue = new LinkedList<>();
@GuardedBy("lock")
private final Queue<Runnable> lowQueue = new LinkedList<>();
@GuardedBy("lock")
private boolean isTaskRunning = false;
public PriorityExecutor(Executor delegate) {
this.delegate = delegate;
high = r -> submit(r, true);
low = r -> submit(r, false);
}
public Executor getHighPriorityExecutor() {
return high;
}
public Executor getLowPriorityExecutor() {
return low;
}
private void submit(Runnable r, boolean isHighPriority) {
Runnable wrapped = () -> {
try {
r.run();
} finally {
scheduleNext();
}
};
synchronized (lock) {
if (!isTaskRunning && highQueue.isEmpty() &&
(isHighPriority || lowQueue.isEmpty())) {
isTaskRunning = true;
delegate.execute(wrapped);
} else if (isHighPriority) {
highQueue.add(wrapped);
} else {
lowQueue.add(wrapped);
}
}
}
private void scheduleNext() {
synchronized (lock) {
Runnable next = highQueue.poll();
if (next == null) next = lowQueue.poll();
if (next == null) isTaskRunning = false;
else delegate.execute(next);
}
}
}

View File

@@ -28,7 +28,6 @@ import org.briarproject.bramble.api.transport.KeySetId;
import org.briarproject.bramble.api.transport.TransportKeys; import org.briarproject.bramble.api.transport.TransportKeys;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@@ -189,8 +188,6 @@ interface Database<T> {
boolean containsVisibleMessage(T txn, ContactId c, MessageId m) boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException; throws DbException;
int countFakes(T txn, List<byte[]> ids) throws DbException;
/** /**
* Returns the number of messages offered by the given contact. * Returns the number of messages offered by the given contact.
* <p/> * <p/>

View File

@@ -313,13 +313,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.containsLocalAuthor(txn, local); return db.containsLocalAuthor(txn, local);
} }
@Override
public int countFakes(Transaction transaction, List<byte[]> ids)
throws DbException {
T txn = unbox(transaction);
return db.countFakes(txn, ids);
}
@Override @Override
public void deleteMessage(Transaction transaction, MessageId m) public void deleteMessage(Transaction transaction, MessageId m)
throws DbException { throws DbException {

View File

@@ -1,6 +1,5 @@
package org.briarproject.bramble.db; package org.briarproject.bramble.db;
import org.briarproject.bramble.api.UniqueId;
import org.briarproject.bramble.api.contact.Contact; import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.crypto.SecretKey; import org.briarproject.bramble.api.crypto.SecretKey;
@@ -48,7 +47,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@@ -94,8 +92,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final int OFFSET_CURR = 0; private static final int OFFSET_CURR = 0;
private static final int OFFSET_NEXT = 1; private static final int OFFSET_NEXT = 1;
private static final int NUM_FAKES = 32 * 1024;
private static final String CREATE_SETTINGS = private static final String CREATE_SETTINGS =
"CREATE TABLE settings" "CREATE TABLE settings"
+ " (namespace _STRING NOT NULL," + " (namespace _STRING NOT NULL,"
@@ -289,20 +285,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)" + " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)"; + " ON DELETE CASCADE)";
// In H2 the MEMORY keyword creates a persistent table with in-memory
// indexes, which must be dropped and recreated when reopening the database.
// In HyperSQL it creates a persistent table that's entirely cached in
// memory, which is the default. Dropping and recreating indexes is
// unnecessary.
private static final String CREATE_FAKES =
"CREATE MEMORY TABLE fakes (fakeId _HASH NOT NULL)";
private static final String DROP_INDEX_FAKES_BY_FAKE_ID =
"DROP INDEX IF EXISTS fakesByFakeId";
private static final String INDEX_FAKES_BY_FAKE_ID =
"CREATE INDEX fakesByFakeId ON fakes (fakeId)";
private static final String INDEX_CONTACTS_BY_AUTHOR_ID = private static final String INDEX_CONTACTS_BY_AUTHOR_ID =
"CREATE INDEX IF NOT EXISTS contactsByAuthorId" "CREATE INDEX IF NOT EXISTS contactsByAuthorId"
+ " ON contacts (authorId)"; + " ON contacts (authorId)";
@@ -394,16 +376,6 @@ abstract class JdbcDatabase implements Database<Connection> {
txn = startTransaction(); txn = startTransaction();
try { try {
storeLastCompacted(txn); storeLastCompacted(txn);
createInMemoryIndexes(txn);
commitTransaction(txn);
} catch (DbException e) {
abortTransaction(txn);
throw e;
}
} else {
txn = startTransaction();
try {
createInMemoryIndexes(txn);
commitTransaction(txn); commitTransaction(txn);
} catch (DbException e) { } catch (DbException e) {
abortTransaction(txn); abortTransaction(txn);
@@ -489,7 +461,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private void createTables(Connection txn) throws DbException { private void createTables(Connection txn) throws DbException {
Statement s = null; Statement s = null;
PreparedStatement ps = null;
try { try {
s = txn.createStatement(); s = txn.createStatement();
s.executeUpdate(dbTypes.replaceTypes(CREATE_SETTINGS)); s.executeUpdate(dbTypes.replaceTypes(CREATE_SETTINGS));
@@ -506,29 +477,9 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(dbTypes.replaceTypes(CREATE_TRANSPORTS)); s.executeUpdate(dbTypes.replaceTypes(CREATE_TRANSPORTS));
s.executeUpdate(dbTypes.replaceTypes(CREATE_OUTGOING_KEYS)); s.executeUpdate(dbTypes.replaceTypes(CREATE_OUTGOING_KEYS));
s.executeUpdate(dbTypes.replaceTypes(CREATE_INCOMING_KEYS)); s.executeUpdate(dbTypes.replaceTypes(CREATE_INCOMING_KEYS));
s.executeUpdate(dbTypes.replaceTypes(CREATE_FAKES));
s.close(); s.close();
Random random = new Random();
long start = now();
ps = txn.prepareStatement("INSERT INTO fakes (fakeId) VALUES (?)");
int inserted = 0;
while (inserted < NUM_FAKES) {
int batchSize = Math.min(1024, NUM_FAKES - inserted);
for (int i = 0; i < batchSize; i++) {
byte[] id = new byte[UniqueId.LENGTH];
random.nextBytes(id);
ps.setBytes(1, id);
ps.addBatch();
}
ps.executeBatch();
inserted += batchSize;
}
ps.close();
LOG.info("Inserting " + NUM_FAKES + " fakes took "
+ (now() - start) + " ms");
} catch (SQLException e) { } catch (SQLException e) {
tryToClose(s, LOG, WARNING); tryToClose(s, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e); throw new DbException(e);
} }
} }
@@ -550,21 +501,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
private void createInMemoryIndexes(Connection txn) throws DbException {
Statement s = null;
try {
long start = now();
s = txn.createStatement();
s.executeUpdate(dbTypes.replaceTypes(DROP_INDEX_FAKES_BY_FAKE_ID));
s.executeUpdate(dbTypes.replaceTypes(INDEX_FAKES_BY_FAKE_ID));
s.close();
LOG.info("Indexing fakes took " + (now() - start) + " ms");
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public Connection startTransaction() throws DbException { public Connection startTransaction() throws DbException {
Connection txn; Connection txn;
@@ -1217,41 +1153,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public int countFakes(Connection txn, List<byte[]> ids)
throws DbException {
if (ids.isEmpty()) return 0;
PreparedStatement ps = null;
ResultSet rs = null;
try {
long start = now();
StringBuilder sb = new StringBuilder();
sb.append("SELECT COUNT (fakeId) FROM fakes WHERE fakeId IN (?");
for (int i = 1; i < ids.size(); i++) {
sb.append(",?");
}
sb.append(')');
String sql = sb.toString();
ps = txn.prepareStatement(sql);
for (int i = 0; i < ids.size(); i++) {
ps.setBytes(i + 1, ids.get(i));
}
rs = ps.executeQuery();
if (!rs.next()) throw new DbException();
int count = rs.getInt(1);
if (rs.next()) throw new DbException();
rs.close();
ps.close();
LOG.info("Counting " + ids.size() + " fakes took "
+ (now() - start) + " ms, found " + count);
return count;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public int countOfferedMessages(Connection txn, ContactId c) public int countOfferedMessages(Connection txn, ContactId c)
throws DbException { throws DbException {

View File

@@ -0,0 +1,86 @@
package org.briarproject.bramble;
import org.briarproject.bramble.test.BrambleTestCase;
import org.junit.Test;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class PriorityExecutorTest extends BrambleTestCase {
@Test
public void testHighPriorityTasksAreDelegatedInOrderOfSubmission()
throws Exception {
Executor delegate = newSingleThreadExecutor();
PriorityExecutor priority = new PriorityExecutor(delegate);
Executor high = priority.getHighPriorityExecutor();
testTasksAreDelegatedInOrderOfSubmission(high);
}
@Test
public void testLowPriorityTasksAreDelegatedInOrderOfSubmission()
throws Exception {
Executor delegate = newSingleThreadExecutor();
PriorityExecutor priority = new PriorityExecutor(delegate);
Executor low = priority.getLowPriorityExecutor();
testTasksAreDelegatedInOrderOfSubmission(low);
}
@Test
public void testHighPriorityTasksAreRunFirst() throws Exception {
Executor delegate = newSingleThreadExecutor();
PriorityExecutor priority = new PriorityExecutor(delegate);
Executor high = priority.getHighPriorityExecutor();
Executor low = priority.getLowPriorityExecutor();
// Submit a task that will block, causing other tasks to be queued
CountDownLatch cork = new CountDownLatch(1);
low.execute(() -> {
try {
cork.await();
} catch (InterruptedException e) {
fail();
}
});
// Submit alternating tasks to the high and low priority executors
List<Integer> results = new Vector<>();
CountDownLatch tasksFinished = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int result = i;
Runnable task = () -> {
results.add(result);
tasksFinished.countDown();
};
if (i % 2 == 0) high.execute(task);
else low.execute(task);
}
// Release the cork and wait for all tasks to finish
cork.countDown();
tasksFinished.await();
// The high-priority tasks should have run before the low-priority tasks
assertEquals(asList(0, 2, 4, 6, 8, 1, 3, 5, 7, 9), results);
}
private void testTasksAreDelegatedInOrderOfSubmission(Executor e)
throws Exception {
List<Integer> results = new Vector<>();
CountDownLatch tasksFinished = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int result = i;
e.execute(() -> {
results.add(result);
tasksFinished.countDown();
});
}
// Wait for all the tasks to finish
tasksFinished.await();
// The tasks should have run in the order they were submitted
assertEquals(asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), results);
}
}

View File

@@ -131,19 +131,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(testDir.mkdirs()); assertTrue(testDir.mkdirs());
} }
@Test
public void testFakes() throws Exception {
List<byte[]> ids = new ArrayList<>(1024);
for (int i = 0; i < 1024; i++) {
ids.add(getRandomId());
}
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
assertEquals(0, db.countFakes(txn, ids));
db.commitTransaction(txn);
db.close();
}
@Test @Test
public void testPersistence() throws Exception { public void testPersistence() throws Exception {
// Store some records // Store some records

View File

@@ -12,7 +12,6 @@ import org.briarproject.bramble.api.contact.ContactExchangeTask;
import org.briarproject.bramble.api.contact.ContactManager; import org.briarproject.bramble.api.contact.ContactManager;
import org.briarproject.bramble.api.crypto.CryptoExecutor; import org.briarproject.bramble.api.crypto.CryptoExecutor;
import org.briarproject.bramble.api.crypto.PasswordStrengthEstimator; import org.briarproject.bramble.api.crypto.PasswordStrengthEstimator;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.identity.IdentityManager;
@@ -83,8 +82,6 @@ public interface AndroidComponent
@DatabaseExecutor @DatabaseExecutor
Executor databaseExecutor(); Executor databaseExecutor();
DatabaseComponent databaseComponent();
MessageTracker messageTracker(); MessageTracker messageTracker();
LifecycleManager lifecycleManager(); LifecycleManager lifecycleManager();

View File

@@ -223,7 +223,6 @@ public class NavDrawerActivity extends BriarActivity implements
@Override @Override
public boolean onNavigationItemSelected(@NonNull MenuItem item) { public boolean onNavigationItemSelected(@NonNull MenuItem item) {
controller.countFakes();
drawerLayout.closeDrawer(START); drawerLayout.closeDrawer(START);
clearBackStack(); clearBackStack();
if (item.getItemId() == R.id.nav_btn_lock) { if (item.getItemId() == R.id.nav_btn_lock) {

View File

@@ -21,5 +21,4 @@ public interface NavDrawerController extends ActivityLifecycleController {
void shouldAskForDozeWhitelisting(Context ctx, void shouldAskForDozeWhitelisting(Context ctx,
ResultHandler<Boolean> handler); ResultHandler<Boolean> handler);
void countFakes();
} }

View File

@@ -3,8 +3,6 @@ package org.briarproject.briar.android.navdrawer;
import android.app.Activity; import android.app.Activity;
import android.content.Context; import android.content.Context;
import org.briarproject.bramble.api.UniqueId;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.Event;
@@ -23,9 +21,6 @@ import org.briarproject.bramble.api.settings.SettingsManager;
import org.briarproject.briar.android.controller.DbControllerImpl; import org.briarproject.briar.android.controller.DbControllerImpl;
import org.briarproject.briar.android.controller.handler.ResultHandler; import org.briarproject.briar.android.controller.handler.ResultHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -54,7 +49,6 @@ public class NavDrawerControllerImpl extends DbControllerImpl
private static final String EXPIRY_DATE_WARNING = "expiryDateWarning"; private static final String EXPIRY_DATE_WARNING = "expiryDateWarning";
private static final String EXPIRY_SHOW_UPDATE = "expiryShowUpdate"; private static final String EXPIRY_SHOW_UPDATE = "expiryShowUpdate";
private final DatabaseComponent db;
private final PluginManager pluginManager; private final PluginManager pluginManager;
private final SettingsManager settingsManager; private final SettingsManager settingsManager;
private final EventBus eventBus; private final EventBus eventBus;
@@ -63,11 +57,9 @@ public class NavDrawerControllerImpl extends DbControllerImpl
@Inject @Inject
NavDrawerControllerImpl(@DatabaseExecutor Executor dbExecutor, NavDrawerControllerImpl(@DatabaseExecutor Executor dbExecutor,
DatabaseComponent db, LifecycleManager lifecycleManager, LifecycleManager lifecycleManager, PluginManager pluginManager,
PluginManager pluginManager, SettingsManager settingsManager, SettingsManager settingsManager, EventBus eventBus) {
EventBus eventBus) {
super(dbExecutor, lifecycleManager); super(dbExecutor, lifecycleManager);
this.db = db;
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
this.settingsManager = settingsManager; this.settingsManager = settingsManager;
this.eventBus = eventBus; this.eventBus = eventBus;
@@ -193,25 +185,6 @@ public class NavDrawerControllerImpl extends DbControllerImpl
}); });
} }
@Override
public void countFakes() {
int numFakes = 1024;
Random random = new Random();
List<byte[]> ids = new ArrayList<>(numFakes);
for (int i = 0; i < numFakes; i++) {
byte[] id = new byte[UniqueId.LENGTH];
random.nextBytes(id);
ids.add(id);
}
runOnDbThread(() -> {
try {
db.transaction(true, txn -> db.countFakes(txn, ids));
} catch (DbException e) {
logException(LOG, WARNING, e);
}
});
}
@Override @Override
public boolean isTransportRunning(TransportId transportId) { public boolean isTransportRunning(TransportId transportId) {
Plugin plugin = pluginManager.getPlugin(transportId); Plugin plugin = pluginManager.getPlugin(transportId);