From 30edb904264f887ed31e9def51e434433f4de604 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 31 Jan 2018 11:44:22 +0000 Subject: [PATCH] Add migration from schema 30 to 31. --- .../briarproject/bramble/db/JdbcDatabase.java | 191 ++++++++---- .../bramble/db/Migration30_31.java | 75 +++++ .../bramble/db/Migration30_31Test.java | 280 ++++++++++++++++++ 3 files changed, 482 insertions(+), 64 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/db/Migration30_31.java create mode 100644 bramble-core/src/test/java/org/briarproject/bramble/db/Migration30_31Test.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 19247ea78..29e91ce24 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -71,7 +71,7 @@ import static org.briarproject.bramble.db.ExponentialBackoff.calculateExpiry; abstract class JdbcDatabase implements Database { // Package access for testing - static final int CODE_SCHEMA_VERSION = 30; + static final int CODE_SCHEMA_VERSION = 31; private static final String CREATE_SETTINGS = "CREATE TABLE settings" @@ -150,11 +150,16 @@ abstract class JdbcDatabase implements Database { private static final String CREATE_MESSAGE_METADATA = "CREATE TABLE messageMetadata" + " (messageId HASH NOT NULL," + + " groupId HASH NOT NULL," // Denormalised + + " state INT NOT NULL," // Denormalised + " key VARCHAR NOT NULL," + " value BINARY NOT NULL," + " PRIMARY KEY (messageId, key)," + " FOREIGN KEY (messageId)" + " REFERENCES messages (messageId)" + + " ON DELETE CASCADE," + + " FOREIGN KEY (groupId)" + + " REFERENCES groups (groupId)" + " ON DELETE CASCADE)"; private static final String CREATE_MESSAGE_DEPENDENCIES = @@ -242,6 +247,10 @@ abstract class JdbcDatabase implements Database { "CREATE INDEX IF NOT EXISTS groupsByClientId" + " ON groups (clientId)"; + private static final String INDEX_MESSAGE_METADATA_BY_GROUP_ID_STATE = + "CREATE INDEX IF NOT EXISTS messageMetadataByGroupIdState" + + " ON messageMetadata (groupId, state)"; + private static final Logger LOG = Logger.getLogger(JdbcDatabase.class.getName()); @@ -330,7 +339,7 @@ abstract class JdbcDatabase implements Database { // Package access for testing List> getMigrations() { - return Collections.emptyList(); + return Collections.singletonList(new Migration30_31()); } private void storeSchemaVersion(Connection txn, int version) @@ -387,6 +396,7 @@ abstract class JdbcDatabase implements Database { s = txn.createStatement(); s.executeUpdate(INDEX_CONTACTS_BY_AUTHOR_ID); s.executeUpdate(INDEX_GROUPS_BY_CLIENT_ID); + s.executeUpdate(INDEX_MESSAGE_METADATA_BY_GROUP_ID_STATE); s.close(); } catch (SQLException e) { tryToClose(s); @@ -512,7 +522,8 @@ abstract class JdbcDatabase implements Database { try { // Create a contact row String sql = "INSERT INTO contacts" - + " (authorId, name, publicKey, localAuthorId, verified, active)" + + " (authorId, name, publicKey, localAuthorId," + + " verified, active)" + " VALUES (?, ?, ?, ?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, remote.getId().getBytes()); @@ -1342,16 +1353,13 @@ abstract class JdbcDatabase implements Database { try { // Retrieve the message IDs for each query term and intersect Set intersection = null; - String sql = "SELECT m.messageId" - + " FROM messages AS m" - + " JOIN messageMetadata AS md" - + " ON m.messageId = md.messageId" - + " WHERE state = ? AND groupId = ?" + String sql = "SELECT messageId FROM messageMetadata" + + " WHERE groupId = ? AND state = ?" + " AND key = ? AND value = ?"; for (Entry e : query.entrySet()) { ps = txn.prepareStatement(sql); - ps.setInt(1, DELIVERED.getValue()); - ps.setBytes(2, g.getBytes()); + ps.setBytes(1, g.getBytes()); + ps.setInt(2, DELIVERED.getValue()); ps.setString(3, e.getKey()); ps.setBytes(4, e.getValue()); rs = ps.executeQuery(); @@ -1379,25 +1387,20 @@ abstract class JdbcDatabase implements Database { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT m.messageId, key, value" - + " FROM messages AS m" - + " JOIN messageMetadata AS md" - + " ON m.messageId = md.messageId" - + " WHERE state = ? AND groupId = ?" - + " ORDER BY m.messageId"; + String sql = "SELECT messageId, key, value" + + " FROM messageMetadata" + + " WHERE groupId = ? AND state = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, DELIVERED.getValue()); - ps.setBytes(2, g.getBytes()); + ps.setBytes(1, g.getBytes()); + ps.setInt(2, DELIVERED.getValue()); rs = ps.executeQuery(); Map all = new HashMap<>(); - Metadata metadata = null; - MessageId lastMessageId = null; while (rs.next()) { MessageId messageId = new MessageId(rs.getBytes(1)); - if (lastMessageId == null || !messageId.equals(lastMessageId)) { + Metadata metadata = all.get(messageId); + if (metadata == null) { metadata = new Metadata(); all.put(messageId, metadata); - lastMessageId = messageId; } metadata.put(rs.getString(2), rs.getBytes(3)); } @@ -1452,10 +1455,8 @@ abstract class JdbcDatabase implements Database { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT key, value FROM messageMetadata AS md" - + " JOIN messages AS m" - + " ON m.messageId = md.messageId" - + " WHERE m.state = ? AND md.messageId = ?"; + String sql = "SELECT key, value FROM messageMetadata" + + " WHERE state = ? AND messageId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, DELIVERED.getValue()); ps.setBytes(2, m.getBytes()); @@ -1478,11 +1479,9 @@ abstract class JdbcDatabase implements Database { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT key, value FROM messageMetadata AS md" - + " JOIN messages AS m" - + " ON m.messageId = md.messageId" - + " WHERE (m.state = ? OR m.state = ?)" - + " AND md.messageId = ?"; + String sql = "SELECT key, value FROM messageMetadata" + + " WHERE (state = ? OR state = ?)" + + " AND messageId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, DELIVERED.getValue()); ps.setInt(2, PENDING.getValue()); @@ -2056,7 +2055,7 @@ abstract class JdbcDatabase implements Database { int[] batchAffected = ps.executeBatch(); if (batchAffected.length != requested.size()) throw new DbStateException(); - for (int rows: batchAffected) { + for (int rows : batchAffected) { if (rows < 0) throw new DbStateException(); if (rows > 1) throw new DbStateException(); } @@ -2070,25 +2069,92 @@ abstract class JdbcDatabase implements Database { @Override public void mergeGroupMetadata(Connection txn, GroupId g, Metadata meta) throws DbException { - mergeMetadata(txn, g.getBytes(), meta, "groupMetadata", "groupId"); + PreparedStatement ps = null; + try { + Map added = removeOrUpdateMetadata(txn, + g.getBytes(), meta, "groupMetadata", "groupId"); + if (added.isEmpty()) return; + // Insert any keys that don't already exist + String sql = "INSERT INTO groupMetadata (groupId, key, value)" + + " VALUES (?, ?, ?)"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, g.getBytes()); + for (Entry e : added.entrySet()) { + ps.setString(2, e.getKey()); + ps.setBytes(3, e.getValue()); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if (batchAffected.length != added.size()) + throw new DbStateException(); + for (int rows : batchAffected) + if (rows != 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + tryToClose(ps); + throw new DbException(e); + } } @Override - public void mergeMessageMetadata(Connection txn, MessageId m, Metadata meta) - throws DbException { - mergeMetadata(txn, m.getBytes(), meta, "messageMetadata", "messageId"); + public void mergeMessageMetadata(Connection txn, MessageId m, + Metadata meta) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + Map added = removeOrUpdateMetadata(txn, + m.getBytes(), meta, "messageMetadata", "messageId"); + if (added.isEmpty()) return; + // Get the group ID and message state for the denormalised columns + String sql = "SELECT groupId, state FROM messages" + + " WHERE messageId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + rs = ps.executeQuery(); + if (!rs.next()) throw new DbStateException(); + GroupId g = new GroupId(rs.getBytes(1)); + State state = State.fromValue(rs.getInt(2)); + rs.close(); + ps.close(); + // Insert any keys that don't already exist + sql = "INSERT INTO messageMetadata" + + " (messageId, groupId, state, key, value)" + + " VALUES (?, ?, ?, ?, ?)"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setBytes(2, g.getBytes()); + ps.setInt(3, state.getValue()); + for (Entry e : added.entrySet()) { + ps.setString(4, e.getKey()); + ps.setBytes(5, e.getValue()); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if (batchAffected.length != added.size()) + throw new DbStateException(); + for (int rows : batchAffected) + if (rows != 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } } - private void mergeMetadata(Connection txn, byte[] id, Metadata meta, - String tableName, String columnName) throws DbException { + // Removes or updates any existing entries, returns any entries that + // need to be added + private Map removeOrUpdateMetadata(Connection txn, + byte[] id, Metadata meta, String tableName, String columnName) + throws DbException { PreparedStatement ps = null; try { // Determine which keys are being removed List removed = new ArrayList<>(); - Map retained = new HashMap<>(); + Map notRemoved = new HashMap<>(); for (Entry e : meta.entrySet()) { if (e.getValue() == REMOVE) removed.add(e.getKey()); - else retained.put(e.getKey(), e.getValue()); + else notRemoved.put(e.getKey(), e.getValue()); } // Delete any keys that are being removed if (!removed.isEmpty()) { @@ -2109,45 +2175,33 @@ abstract class JdbcDatabase implements Database { } ps.close(); } - if (retained.isEmpty()) return; + if (notRemoved.isEmpty()) return Collections.emptyMap(); // Update any keys that already exist String sql = "UPDATE " + tableName + " SET value = ?" + " WHERE " + columnName + " = ? AND key = ?"; ps = txn.prepareStatement(sql); ps.setBytes(2, id); - for (Entry e : retained.entrySet()) { + for (Entry e : notRemoved.entrySet()) { ps.setBytes(1, e.getValue()); ps.setString(3, e.getKey()); ps.addBatch(); } int[] batchAffected = ps.executeBatch(); - if (batchAffected.length != retained.size()) + if (batchAffected.length != notRemoved.size()) throw new DbStateException(); for (int rows : batchAffected) { if (rows < 0) throw new DbStateException(); if (rows > 1) throw new DbStateException(); } - // Insert any keys that don't already exist - sql = "INSERT INTO " + tableName - + " (" + columnName + ", key, value)" - + " VALUES (?, ?, ?)"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, id); - int updateIndex = 0, inserted = 0; - for (Entry e : retained.entrySet()) { - if (batchAffected[updateIndex] == 0) { - ps.setString(2, e.getKey()); - ps.setBytes(3, e.getValue()); - ps.addBatch(); - inserted++; - } - updateIndex++; - } - batchAffected = ps.executeBatch(); - if (batchAffected.length != inserted) throw new DbStateException(); - for (int rows : batchAffected) - if (rows != 1) throw new DbStateException(); ps.close(); + // Are there any keys that don't already exist? + Map added = new HashMap<>(); + int updateIndex = 0; + for (Entry e : notRemoved.entrySet()) { + if (batchAffected[updateIndex++] == 0) + added.put(e.getKey(), e.getValue()); + } + return added; } catch (SQLException e) { tryToClose(ps); throw new DbException(e); @@ -2500,7 +2554,8 @@ abstract class JdbcDatabase implements Database { } @Override - public void setMessageShared(Connection txn, MessageId m) throws DbException { + public void setMessageShared(Connection txn, MessageId m) + throws DbException { PreparedStatement ps = null; try { String sql = "UPDATE messages SET shared = TRUE" @@ -2528,6 +2583,14 @@ abstract class JdbcDatabase implements Database { int affected = ps.executeUpdate(); if (affected < 0 || affected > 1) throw new DbStateException(); ps.close(); + // Update denormalised column in messageMetadata + sql = "UPDATE messageMetadata SET state = ? WHERE messageId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, state.getValue()); + ps.setBytes(2, m.getBytes()); + affected = ps.executeUpdate(); + if (affected < 0) throw new DbStateException(); + ps.close(); } catch (SQLException e) { tryToClose(ps); throw new DbException(e); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Migration30_31.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration30_31.java new file mode 100644 index 000000000..e2be817bc --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration30_31.java @@ -0,0 +1,75 @@ +package org.briarproject.bramble.db; + +import org.briarproject.bramble.api.db.DbException; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import static java.util.logging.Level.WARNING; + +class Migration30_31 implements Migration { + + private static final Logger LOG = + Logger.getLogger(Migration30_31.class.getName()); + + @Override + public int getStartVersion() { + return 30; + } + + @Override + public int getEndVersion() { + return 31; + } + + @Override + public void migrate(Connection txn) throws DbException { + Statement s = null; + try { + s = txn.createStatement(); + // Add groupId column + s.execute("ALTER TABLE messageMetadata" + + " ADD COLUMN groupId BINARY(32) AFTER messageId"); + // Populate groupId column + s.execute("UPDATE messageMetadata AS mm SET groupId =" + + " (SELECT groupId FROM messages AS m" + + " WHERE mm.messageId = m.messageId)"); + // Add not null constraint now column has been populated + s.execute("ALTER TABLE messageMetadata" + + " ALTER COLUMN groupId" + + " SET NOT NULL"); + // Add foreign key constraint + s.execute("ALTER TABLE messageMetadata" + + " ADD CONSTRAINT groupIdForeignKey" + + " FOREIGN KEY (groupId)" + + " REFERENCES groups (groupId)" + + " ON DELETE CASCADE"); + // Add state column + s.execute("ALTER TABLE messageMetadata" + + " ADD COLUMN state INT AFTER groupId"); + // Populate state column + s.execute("UPDATE messageMetadata AS mm SET state =" + + " (SELECT state FROM messages AS m" + + " WHERE mm.messageId = m.messageId)"); + // Add not null constraint now column has been populated + s.execute("ALTER TABLE messageMetadata" + + " ALTER COLUMN state" + + " SET NOT NULL"); + } catch (SQLException e) { + tryToClose(s); + throw new DbException(e); + } + } + + private void tryToClose(@Nullable Statement s) { + try { + if (s != null) s.close(); + } catch (SQLException e) { + if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/Migration30_31Test.java b/bramble-core/src/test/java/org/briarproject/bramble/db/Migration30_31Test.java new file mode 100644 index 000000000..ff26a50bc --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/Migration30_31Test.java @@ -0,0 +1,280 @@ +package org.briarproject.bramble.db; + +import org.briarproject.bramble.api.db.Metadata; +import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.ValidationManager.State; +import org.briarproject.bramble.test.BrambleTestCase; +import org.briarproject.bramble.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map.Entry; + +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; +import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED; +import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN; +import static org.briarproject.bramble.test.TestUtils.getRandomBytes; +import static org.briarproject.bramble.test.TestUtils.getRandomId; +import static org.briarproject.bramble.util.StringUtils.getRandomString; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class Migration30_31Test extends BrambleTestCase { + + private static final String CREATE_GROUPS_STUB = + "CREATE TABLE groups" + + " (groupID BINARY(32) NOT NULL," + + " PRIMARY KEY (groupId))"; + + private static final String CREATE_MESSAGES = + "CREATE TABLE messages" + + " (messageId BINARY(32) NOT NULL," + + " groupId BINARY(32) NOT NULL," + + " timestamp BIGINT NOT NULL," + + " state INT NOT NULL," + + " shared BOOLEAN NOT NULL," + + " length INT NOT NULL," + + " raw BLOB," // Null if message has been deleted + + " PRIMARY KEY (messageId)," + + " FOREIGN KEY (groupId)" + + " REFERENCES groups (groupId)" + + " ON DELETE CASCADE)"; + + private static final String CREATE_MESSAGE_METADATA_30 = + "CREATE TABLE messageMetadata" + + " (messageId BINARY(32) NOT NULL," + + " key VARCHAR NOT NULL," + + " value BINARY NOT NULL," + + " PRIMARY KEY (messageId, key)," + + " FOREIGN KEY (messageId)" + + " REFERENCES messages (messageId)" + + " ON DELETE CASCADE)"; + + private final File testDir = TestUtils.getTestDirectory(); + private final File db = new File(testDir, "db"); + private final String url = "jdbc:h2:" + db.getAbsolutePath(); + private final GroupId groupId = new GroupId(getRandomId()); + private final GroupId groupId1 = new GroupId(getRandomId()); + private final Message message = TestUtils.getMessage(groupId); + private final Message message1 = TestUtils.getMessage(groupId1); + private final Metadata meta = new Metadata(), meta1 = new Metadata(); + + private Connection connection = null; + + public Migration30_31Test() { + for (int i = 0; i < 10; i++) { + meta.put(getRandomString(123 + i), getRandomBytes(123 + i)); + meta1.put(getRandomString(123 + i), getRandomBytes(123 + i)); + } + } + + @Before + public void setUp() throws Exception { + assertTrue(testDir.mkdirs()); + Class.forName("org.h2.Driver"); + connection = DriverManager.getConnection(url); + } + + @After + public void tearDown() throws Exception { + if (connection != null) connection.close(); + TestUtils.deleteTestDirectory(testDir); + } + + @Test + public void testMigration() throws Exception { + try { + Statement s = connection.createStatement(); + s.execute(CREATE_GROUPS_STUB); + s.execute(CREATE_MESSAGES); + s.execute(CREATE_MESSAGE_METADATA_30); + s.close(); + + addGroup(groupId); + addMessage(message, DELIVERED, true); + addMessageMetadata30(message, meta); + assertMetadataEquals(meta, getMessageMetadata(message.getId())); + + addGroup(groupId1); + addMessage(message1, UNKNOWN, false); + addMessageMetadata30(message1, meta1); + assertMetadataEquals(meta1, getMessageMetadata(message1.getId())); + + new Migration30_31().migrate(connection); + + assertMetadataEquals(meta, getMessageMetadata(message.getId())); + for (String key : meta.keySet()) { + GroupId g = getMessageMetadataGroupId31(message.getId(), key); + assertEquals(groupId, g); + State state = getMessageMetadataState31(message.getId(), key); + assertEquals(DELIVERED, state); + } + + assertMetadataEquals(meta1, getMessageMetadata(message1.getId())); + for (String key : meta1.keySet()) { + GroupId g = getMessageMetadataGroupId31(message1.getId(), key); + assertEquals(groupId1, g); + State state = getMessageMetadataState31(message1.getId(), key); + assertEquals(UNKNOWN, state); + } + } catch (SQLException e) { + connection.close(); + throw e; + } + } + + private void addGroup(GroupId g) throws SQLException { + PreparedStatement ps = null; + try { + String sql = "INSERT INTO groups (groupId) VALUES (?)"; + ps = connection.prepareStatement(sql); + ps.setBytes(1, g.getBytes()); + int affected = ps.executeUpdate(); + if (affected != 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + if (ps != null) ps.close(); + throw e; + } + } + + private void addMessage(Message m, State state, boolean shared) + throws SQLException { + PreparedStatement ps = null; + try { + String sql = "INSERT INTO messages (messageId, groupId, timestamp," + + " state, shared, length, raw)" + + " VALUES (?, ?, ?, ?, ?, ?, ?)"; + ps = connection.prepareStatement(sql); + ps.setBytes(1, m.getId().getBytes()); + ps.setBytes(2, m.getGroupId().getBytes()); + ps.setLong(3, m.getTimestamp()); + ps.setInt(4, state.getValue()); + ps.setBoolean(5, shared); + byte[] raw = m.getRaw(); + ps.setInt(6, raw.length); + ps.setBytes(7, raw); + int affected = ps.executeUpdate(); + if (affected != 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + if (ps != null) ps.close(); + throw e; + } + } + + private void addMessageMetadata30(Message m, Metadata meta) + throws SQLException { + PreparedStatement ps = null; + try { + String sql = "INSERT INTO messageMetadata" + + " (messageId, key, value)" + + " VALUES (?, ?, ?)"; + ps = connection.prepareStatement(sql); + ps.setBytes(1, m.getId().getBytes()); + for (Entry e : meta.entrySet()) { + ps.setString(2, e.getKey()); + ps.setBytes(3, e.getValue()); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if (batchAffected.length != meta.size()) + throw new DbStateException(); + for (int rows : batchAffected) + if (rows != 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + if (ps != null) ps.close(); + throw e; + } + } + + private Metadata getMessageMetadata(MessageId m) throws SQLException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT key, value FROM messageMetadata" + + " WHERE messageId = ?"; + ps = connection.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + rs = ps.executeQuery(); + Metadata meta = new Metadata(); + while (rs.next()) meta.put(rs.getString(1), rs.getBytes(2)); + rs.close(); + ps.close(); + return meta; + } catch (SQLException e) { + if (rs != null) rs.close(); + if (ps != null) ps.close(); + throw e; + } + } + + private GroupId getMessageMetadataGroupId31(MessageId m, String key) + throws SQLException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT groupId FROM messageMetadata" + + " WHERE messageId = ? AND key = ?"; + ps = connection.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setString(2, key); + rs = ps.executeQuery(); + if (!rs.next()) throw new DbStateException(); + GroupId g = new GroupId(rs.getBytes(1)); + if (rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + return g; + } catch (SQLException e) { + if (rs != null) rs.close(); + if (ps != null) ps.close(); + throw e; + } + } + + private State getMessageMetadataState31(MessageId m, String key) + throws SQLException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT state FROM messageMetadata" + + " WHERE messageId = ? AND key = ?"; + ps = connection.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setString(2, key); + rs = ps.executeQuery(); + if (!rs.next()) throw new DbStateException(); + State state = State.fromValue(rs.getInt(1)); + if (rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + return state; + } catch (SQLException e) { + if (rs != null) rs.close(); + if (ps != null) ps.close(); + throw e; + } + } + + private void assertMetadataEquals(Metadata expected, Metadata actual) { + assertEquals(expected.size(), actual.size()); + for (Entry e : expected.entrySet()) { + byte[] value = actual.get(e.getKey()); + assertNotNull(value); + assertArrayEquals(e.getValue(), value); + } + } +}