Moved transactions out of database component.

This commit is contained in:
akwizgran
2016-02-11 13:35:46 +00:00
parent ef2b2b9710
commit de8cc50fb4
24 changed files with 1828 additions and 1451 deletions

View File

@@ -7,7 +7,6 @@ import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
@@ -21,6 +20,7 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.NoSuchGroupException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.properties.TransportProperties;
import org.briarproject.api.properties.TransportPropertyManager;
import org.briarproject.api.sync.ClientId;
@@ -41,10 +41,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.properties.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
@@ -57,11 +54,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
private static final byte[] LOCAL_GROUP_DESCRIPTOR = new byte[0];
private static final Logger LOG =
Logger.getLogger(TransportPropertyManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final PrivateGroupFactory privateGroupFactory;
private final MessageFactory messageFactory;
private final BdfReaderFactory bdfReaderFactory;
@@ -71,18 +64,13 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
private final Clock clock;
private final Group localGroup;
/** Ensures isolation between database reads and writes. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Inject
TransportPropertyManagerImpl(DatabaseComponent db,
ContactManager contactManager, GroupFactory groupFactory,
PrivateGroupFactory privateGroupFactory,
GroupFactory groupFactory, PrivateGroupFactory privateGroupFactory,
MessageFactory messageFactory, BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser, Clock clock) {
this.db = db;
this.contactManager = contactManager;
this.privateGroupFactory = privateGroupFactory;
this.messageFactory = messageFactory;
this.bdfReaderFactory = bdfReaderFactory;
@@ -95,165 +83,171 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
}
@Override
public void addingContact(Contact c) {
lock.writeLock().lock();
try {
// Create a group to share with the contact
Group g = getContactGroup(c);
// Store the group and share it with the contact
db.addGroup(g);
db.setVisibleToContact(c.getId(), g.getId(), true);
// Copy the latest local properties into the group
DeviceId dev = db.getDeviceId();
Map<TransportId, TransportProperties> local = getLocalProperties();
for (Entry<TransportId, TransportProperties> e : local.entrySet()) {
storeMessage(g.getId(), dev, e.getKey(), e.getValue(), 1, true,
true);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
public void addingContact(Transaction txn, Contact c) throws DbException {
// Create a group to share with the contact
Group g = getContactGroup(c);
// Store the group and share it with the contact
db.addGroup(txn, g);
db.setVisibleToContact(txn, c.getId(), g.getId(), true);
// Copy the latest local properties into the group
DeviceId dev = db.getDeviceId(txn);
Map<TransportId, TransportProperties> local = getLocalProperties();
for (Entry<TransportId, TransportProperties> e : local.entrySet()) {
storeMessage(txn, g.getId(), dev, e.getKey(), e.getValue(), 1,
true, true);
}
}
@Override
public void removingContact(Contact c) {
lock.writeLock().lock();
try {
db.removeGroup(getContactGroup(c));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
public void removingContact(Transaction txn, Contact c) throws DbException {
db.removeGroup(txn, getContactGroup(c));
}
@Override
public void addRemoteProperties(ContactId c, DeviceId dev,
Map<TransportId, TransportProperties> props) throws DbException {
lock.writeLock().lock();
Transaction txn = db.startTransaction();
try {
Group g = getContactGroup(contactManager.getContact(c));
Group g = getContactGroup(db.getContact(txn, c));
for (Entry<TransportId, TransportProperties> e : props.entrySet()) {
storeMessage(g.getId(), dev, e.getKey(), e.getValue(), 0, false,
false);
storeMessage(txn, g.getId(), dev, e.getKey(), e.getValue(), 0,
false, false);
}
} catch (FormatException e) {
throw new DbException(e);
txn.setComplete();
} finally {
lock.writeLock().unlock();
db.endTransaction(txn);
}
}
@Override
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
lock.readLock().lock();
try {
// Find the latest local update for each transport
Map<TransportId, LatestUpdate> latest =
findLatest(localGroup.getId(), true);
// Retrieve and parse the latest local properties
Map<TransportId, TransportProperties> local =
new HashMap<TransportId, TransportProperties>();
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
byte[] raw = db.getRawMessage(e.getValue().messageId);
local.put(e.getKey(), parseProperties(raw));
Transaction txn = db.startTransaction();
try {
// Find the latest local update for each transport
Map<TransportId, LatestUpdate> latest = findLatest(txn,
localGroup.getId(), true);
// Retrieve and parse the latest local properties
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
byte[] raw = db.getRawMessage(txn, e.getValue().messageId);
local.put(e.getKey(), parseProperties(raw));
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return Collections.unmodifiableMap(local);
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return Collections.emptyMap();
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
lock.readLock().lock();
try {
// Find the latest local update
LatestUpdate latest = findLatest(localGroup.getId(), t, true);
if (latest == null) return null;
// Retrieve and parse the latest local properties
return parseProperties(db.getRawMessage(latest.messageId));
TransportProperties p = null;
Transaction txn = db.startTransaction();
try {
// Find the latest local update
LatestUpdate latest = findLatest(txn, localGroup.getId(), t,
true);
if (latest != null) {
// Retrieve and parse the latest local properties
byte[] raw = db.getRawMessage(txn, latest.messageId);
p = parseProperties(raw);
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return p;
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return null;
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
lock.readLock().lock();
try {
Map<ContactId, TransportProperties> remote =
new HashMap<ContactId, TransportProperties>();
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
// Find the latest remote update
LatestUpdate latest = findLatest(g.getId(), t, false);
if (latest != null) {
// Retrieve and parse the latest remote properties
byte[] raw = db.getRawMessage(latest.messageId);
remote.put(c.getId(), parseProperties(raw));
Transaction txn = db.startTransaction();
try {
for (Contact c : db.getContacts(txn)) {
Group g = getContactGroup(c);
// Find the latest remote update
LatestUpdate latest = findLatest(txn, g.getId(), t, false);
if (latest != null) {
// Retrieve and parse the latest remote properties
byte[] raw = db.getRawMessage(txn, latest.messageId);
remote.put(c.getId(), parseProperties(raw));
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return Collections.unmodifiableMap(remote);
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
lock.writeLock().lock();
try {
// Create the local group if necessary
db.addGroup(localGroup);
// Merge the new properties with any existing properties
TransportProperties merged;
LatestUpdate latest = findLatest(localGroup.getId(), t, true);
if (latest == null) {
merged = p;
} else {
byte[] raw = db.getRawMessage(latest.messageId);
TransportProperties old = parseProperties(raw);
merged = new TransportProperties(old);
merged.putAll(p);
if (merged.equals(old)) return; // Unchanged
Transaction txn = db.startTransaction();
try {
// Create the local group if necessary
db.addGroup(txn, localGroup);
// Merge the new properties with any existing properties
TransportProperties merged;
boolean changed;
LatestUpdate latest = findLatest(txn, localGroup.getId(), t,
true);
if (latest == null) {
merged = p;
changed = true;
} else {
byte[] raw = db.getRawMessage(txn, latest.messageId);
TransportProperties old = parseProperties(raw);
merged = new TransportProperties(old);
merged.putAll(p);
changed = !merged.equals(old);
}
if (changed) {
// Store the merged properties in the local group
DeviceId dev = db.getDeviceId(txn);
long version = latest == null ? 1 : latest.version + 1;
storeMessage(txn, localGroup.getId(), dev, t, merged,
version, true, false);
// Store the merged properties in each contact's group
for (Contact c : db.getContacts(txn)) {
Group g = getContactGroup(c);
latest = findLatest(txn, g.getId(), t, true);
version = latest == null ? 1 : latest.version + 1;
storeMessage(txn, g.getId(), dev, t, merged, version,
true, true);
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
// Store the merged properties in the local group
DeviceId dev = db.getDeviceId();
long version = latest == null ? 1 : latest.version + 1;
storeMessage(localGroup.getId(), dev, t, merged, version, true,
false);
// Store the merged properties in each contact's group
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
latest = findLatest(g.getId(), t, true);
version = latest == null ? 1 : latest.version + 1;
storeMessage(g.getId(), dev, t, merged, version, true, true);
}
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@@ -261,18 +255,22 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return privateGroupFactory.createPrivateGroup(CLIENT_ID, c);
}
// Locking: lock.writeLock
private void storeMessage(GroupId g, DeviceId dev, TransportId t,
TransportProperties p, long version, boolean local, boolean shared)
throws DbException, FormatException {
byte[] body = encodeProperties(dev, t, p, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("transportId", t.getString());
d.put("version", version);
d.put("local", local);
db.addLocalMessage(m, CLIENT_ID, metadataEncoder.encode(d), shared);
private void storeMessage(Transaction txn, GroupId g, DeviceId dev,
TransportId t, TransportProperties p, long version, boolean local,
boolean shared) throws DbException {
try {
byte[] body = encodeProperties(dev, t, p, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("transportId", t.getString());
d.put("version", version);
d.put("local", local);
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(txn, m, CLIENT_ID, meta, shared);
} catch (FormatException e) {
throw new RuntimeException(e);
}
}
private byte[] encodeProperties(DeviceId dev, TransportId t,
@@ -293,12 +291,11 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return out.toByteArray();
}
// Locking: lock.readLock
private Map<TransportId, LatestUpdate> findLatest(GroupId g, boolean local)
throws DbException, FormatException {
private Map<TransportId, LatestUpdate> findLatest(Transaction txn,
GroupId g, boolean local) throws DbException, FormatException {
Map<TransportId, LatestUpdate> latestUpdates =
new HashMap<TransportId, LatestUpdate>();
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(txn, g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getBoolean("local") == local) {
@@ -312,11 +309,10 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return latestUpdates;
}
// Locking: lock.readLock
private LatestUpdate findLatest(GroupId g, TransportId t, boolean local)
throws DbException, FormatException {
private LatestUpdate findLatest(Transaction txn, GroupId g, TransportId t,
boolean local) throws DbException, FormatException {
LatestUpdate latest = null;
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(txn, g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getString("transportId").equals(t.getString())
@@ -330,25 +326,32 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
}
private TransportProperties parseProperties(byte[] raw)
throws IOException {
throws FormatException {
TransportProperties p = new TransportProperties();
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
r.readListStart();
r.skipRaw(); // Device ID
r.skipString(); // Transport ID
r.skipInteger(); // Version
r.readDictionaryStart();
while (!r.hasDictionaryEnd()) {
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
try {
r.readListStart();
r.skipRaw(); // Device ID
r.skipString(); // Transport ID
r.skipInteger(); // Version
r.readDictionaryStart();
while (!r.hasDictionaryEnd()) {
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
}
r.readDictionaryEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return p;
} catch (FormatException e) {
throw e;
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
}
r.readDictionaryEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return p;
}
private static class LatestUpdate {