Skip to content

Commit

Permalink
Merge pull request #567 from JeroenMinnaert/develop
Browse files Browse the repository at this point in the history
Drastically improved start up speed
  • Loading branch information
Ben Osheroff authored Feb 20, 2017
2 parents 9b46c00 + c761229 commit 278ff68
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 130 deletions.
5 changes: 5 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[*.java]
charset=utf-8
end_of_line=lf
insert_final_newline=true
indent_style=tab
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ test%: compile-test

test: compile-test
java -classpath $(JAVA_TEST_DEPENDS):target/test-classes:target/classes org.junit.runner.JUnitCore $(TEST_CLASSES)
test-only: compile-test
java -classpath $(JAVA_TEST_DEPENDS):target/test-classes:target/classes org.junit.runner.JUnitCore ${ARGS}

clean:
rm -f target/.java target/.java-test
rm -rf target/classes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.zendesk.maxwell.replication;

import java.util.ArrayList;
import java.util.List;
import java.util.*;

import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
Expand All @@ -14,6 +10,10 @@
import com.zendesk.maxwell.schema.ColumnWithDefinitionList;
import com.zendesk.maxwell.schema.Table;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class UpdateRowsEvent extends AbstractRowsEvent {
private final com.google.code.or.binlog.impl.event.UpdateRowsEvent event;

Expand Down
136 changes: 88 additions & 48 deletions src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import snaq.db.ConnectionPool;


public class MysqlSavedSchema {
static int SchemaStoreVersion = 3;

Expand Down Expand Up @@ -387,10 +388,13 @@ private void restoreDerivedSchema(Connection conn, Long schema_id) throws SQLExc
protected void restoreFromSchemaID(Connection conn, Long schemaID) throws SQLException, InvalidSchemaError {
restoreSchemaMetadata(conn, schemaID);

if ( this.baseSchemaID != null )
if (this.baseSchemaID != null) {
LOGGER.debug("Restoring derived schema");
restoreDerivedSchema(conn, schemaID);
else
} else {
LOGGER.debug("Restoring full schema");
restoreFullSchema(conn, schemaID);
}
}

private void restoreSchemaMetadata(Connection conn, Long schemaID) throws SQLException {
Expand All @@ -414,69 +418,105 @@ private void restoreSchemaMetadata(Connection conn, Long schemaID) throws SQLExc
this.schema = new Schema(new ArrayList<Database>(), schemaRS.getString("charset"), this.sensitivity);
}

private void restoreFullSchema(Connection conn, Long schemaID) throws SQLException, InvalidSchemaError {
PreparedStatement p = conn.prepareStatement("SELECT * from `databases` where schema_id = ? ORDER by id");
p.setLong(1, this.schemaID);

ResultSet dbRS = p.executeQuery();

while (dbRS.next()) {
this.schema.addDatabase(restoreDatabase(conn, dbRS.getInt("id"), dbRS.getString("name"), dbRS.getString("charset")));
}

}

private Database restoreDatabase(Connection conn, int id, String name, String charset) throws SQLException {
Statement s = conn.createStatement();
Database d = new Database(name, charset);

ResultSet tRS = s.executeQuery("SELECT * from `tables` where database_id = " + id + " ORDER by id");

while (tRS.next()) {
String tName = tRS.getString("name");
String tCharset = tRS.getString("charset");
String tPKs = tRS.getString("pk");
private void restoreFullSchema(Connection conn, Long schemaID) throws SQLException, InvalidSchemaError {
PreparedStatement p = conn.prepareStatement(
"SELECT " +
"d.id AS dbId," +
"d.name AS dbName," +
"d.charset AS dbCharset," +
"t.name AS tableName," +
"t.charset AS tableCharset," +
"t.pk AS tablePk," +
"t.id AS tableId," +
"c.column_length AS columnLength," +
"c.enum_values AS columnEnumValues," +
"c.name AS columnName," +
"c.charset AS columnCharset," +
"c.coltype AS columnColtype," +
"c.is_signed AS columnIsSigned " +
"FROM `databases` d " +
"LEFT JOIN tables t ON d.id = t.database_id " +
"LEFT JOIN columns c ON c.table_id=t.id " +
"WHERE d.schema_id = ? " +
"ORDER BY d.id, t.id, c.id"
);

int tID = tRS.getInt("id");
p.setLong(1, this.schemaID);
ResultSet rs = p.executeQuery();

restoreTable(conn, d, tName, tID, tCharset, tPKs);
}
return d;
}
Database currentDatabase = null;
Table currentTable = null;
int columnIndex = 0;

while (rs.next()) {
// Database
String dbName = rs.getString("dbName");
String dbCharset = rs.getString("dbCharset");

// Table
String tName = rs.getString("tableName");
String tCharset = rs.getString("tableCharset");
String tPKs = rs.getString("tablePk");

// Column
String columnName = rs.getString("columnName");
int columnLengthInt = rs.getInt("columnLength");
String columnEnumValues = rs.getString("columnEnumValues");
String columnCharset = rs.getString("columnCharset");
String columnType = rs.getString("columnColtype");
int columnIsSigned = rs.getInt("columnIsSigned");

if (currentDatabase == null || !currentDatabase.getName().equals(dbName)) {
currentDatabase = new Database(dbName, dbCharset);
this.schema.addDatabase(currentDatabase);
LOGGER.debug("Restoring database " + dbName + "...");
}

private void restoreTable(Connection connection, Database d, String name, int id, String charset, String pks) throws SQLException {
Statement s = connection.createStatement();
if (tName == null) {
// if tName is null, there are no tables connected to this database
continue;
} else if (currentTable == null || !currentTable.getName().equals(tName)) {
currentTable = currentDatabase.buildTable(tName, tCharset);
if (tPKs != null) {
List<String> pkList = Arrays.asList(StringUtils.split(tPKs, ','));
currentTable.setPKList(pkList);
}
columnIndex = 0;
}

Table t = d.buildTable(name, charset);

ResultSet cRS = s.executeQuery("SELECT * from `columns` where table_id = " + id + " ORDER by id");
if (columnName == null) {
// If columnName is null, there are no columns connected to this table
continue;
}

int i = 0;
while (cRS.next()) {
Long columnLength;
int columnLengthInt = cRS.getInt("column_length");
if ( cRS.wasNull() )
if (rs.wasNull()) {
columnLength = null;
else
} else {
columnLength = Long.valueOf(columnLengthInt);
}

String[] enumValues = null;
if ( cRS.getString("enum_values") != null )
enumValues = StringUtils.splitByWholeSeparatorPreserveAllTokens(cRS.getString("enum_values"), ",");
if (columnEnumValues != null) {
enumValues = StringUtils.splitByWholeSeparatorPreserveAllTokens(columnEnumValues, ",");
}

ColumnDef c = ColumnDef.build(
cRS.getString("name"), cRS.getString("charset"),
cRS.getString("coltype"), i++,
cRS.getInt("is_signed") == 1,
columnName,
columnCharset,
columnType,
columnIndex++,
columnIsSigned == 1,
enumValues,
columnLength);
t.addColumn(c);
}
columnLength
);
currentTable.addColumn(c);

if ( pks != null ) {
List<String> pkList = Arrays.asList(StringUtils.split(pks, ','));
t.setPKList(pkList);
}
rs.close();
LOGGER.debug("Restored all databases");
}

private static Long findSchema(Connection connection, BinlogPosition targetPosition, Long serverID)
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/zendesk/maxwell/schema/MysqlSchemaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ private MysqlSavedSchema restoreOrCaptureSchema() throws SchemaStoreException {
if ( savedSchema == null ) {
Schema capturedSchema = captureSchema();
savedSchema = new MysqlSavedSchema(serverID, caseSensitivity, capturedSchema, initialPosition);
if ( !readOnly )
savedSchema.save(conn);
if (!readOnly)
if (conn.isValid(30)) {
savedSchema.save(conn);
} else {
// The capture time might be long and the conn connection might be closed already. Consulting the pool
// again for a new connection
Connection newConn = maxwellConnectionPool.getConnection();
savedSchema.save(newConn);
}
}

return savedSchema;
Expand Down
Loading

0 comments on commit 278ff68

Please sign in to comment.