Skip to content

Commit

Permalink
Merge pull request #377 from zendesk/gh_362
Browse files Browse the repository at this point in the history
#362: preserve case of columns as they came in
  • Loading branch information
Ben Osheroff authored Jul 13, 2016
2 parents 704f8b2 + ab627a2 commit d1e8b45
Show file tree
Hide file tree
Showing 18 changed files with 196 additions and 76 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellMysqlConfig.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.zendesk.maxwell;

import java.util.ArrayList;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

import joptsimple.OptionSet;

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/zendesk/maxwell/RowMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private String pkToJsonHash() throws IOException {
if ( data.containsKey(pk) )
pkValue = data.get(pk);

g.writeObjectField("pk." + pk, pkValue);
g.writeObjectField("pk." + pk.toLowerCase(), pkValue);
}
}

Expand All @@ -128,7 +128,7 @@ private String pkToJsonArray() throws IOException {
pkValue = data.get(pk);

g.writeStartObject();
g.writeObjectField(pk, pkValue);
g.writeObjectField(pk.toLowerCase(), pkValue);
g.writeEndObject();
}
g.writeEndArray();
Expand Down
106 changes: 64 additions & 42 deletions src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import com.zendesk.maxwell.CaseSensitivity;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.schema.columndef.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,7 +22,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;

public class MysqlSavedSchema {
static int SchemaStoreVersion = 1;
static int SchemaStoreVersion = 2;

private Schema schema;
private BinlogPosition position;
Expand Down Expand Up @@ -465,38 +466,31 @@ public BinlogPosition getBinlogPosition() {
return this.position;
}

private void fixUnsignedColumns(Connection conn) throws SQLException, InvalidSchemaError {
private void fixUnsignedColumns(Schema recaptured) throws SQLException, InvalidSchemaError {
int unsignedDiffs = 0;

Schema recaptured = new SchemaCapturer(conn, sensitivity).capture();

for ( Database dA : schema.getDatabases() ) {
Database dB = recaptured.findDatabaseOrThrow(dA.getName());
for ( Table tA : dA.getTableList() ) {
Table tB = dB.findTableOrThrow(tA.getName());
for ( ColumnDef cA : tA.getColumnList() ) {
ColumnDef cB = tB.findColumn(cA.getName());

if ( cA instanceof IntColumnDef ) {
if ( cB != null && cB instanceof IntColumnDef ) {
if ( ((IntColumnDef)cA).isSigned() && !((IntColumnDef)cB).isSigned() ) {
((IntColumnDef)cA).setSigned(false);
unsignedDiffs++;
}
} else {
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() +
". You may want to recapture your schema");
}
} else if ( cA instanceof BigIntColumnDef ) {
if ( cB != null && cB instanceof BigIntColumnDef ) {
if ( ((BigIntColumnDef)cA).isSigned() && !((BigIntColumnDef)cB).isSigned() )
((BigIntColumnDef)cA).setSigned(false);
unsignedDiffs++;
} else {
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() +
". You may want to recapture your schema");
}
for ( Pair<ColumnDef, ColumnDef> pair : schema.matchColumns(recaptured) ) {
ColumnDef cA = pair.getLeft();
ColumnDef cB = pair.getRight();

if (cA instanceof IntColumnDef) {
if (cB != null && cB instanceof IntColumnDef) {
if (((IntColumnDef) cA).isSigned() && !((IntColumnDef) cB).isSigned()) {
((IntColumnDef) cA).setSigned(false);
unsignedDiffs++;
}
} else {
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() +
". You may want to recapture your schema");
}
} else if (cA instanceof BigIntColumnDef) {
if (cB != null && cB instanceof BigIntColumnDef) {
if (((BigIntColumnDef) cA).isSigned() && !((BigIntColumnDef) cB).isSigned())
((BigIntColumnDef) cA).setSigned(false);
unsignedDiffs++;
} else {
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() +
". You may want to recapture your schema");
}
}
}
Expand All @@ -505,25 +499,53 @@ private void fixUnsignedColumns(Connection conn) throws SQLException, InvalidSch
/* A little explanation here: we've detected differences in signed-ness between the restored
* and the recaptured schema. 99.9% of the time this will be the result of our capture bug.
*
* We can't however simply re-save the re-captured schema, as we might be behind some DDL updates
* that we'd otherwise lose. So we leave a marker so that the next time we save the schema, we'll
* purposely break the delta chain and fix the unsigned columns in the database.
* We can't however simply re-save the re-captured schema, as the
* capture might be ahead of some DDL updates that we'd otherwise
* lose. So we leave a marker so that the next time we save the
* schema, we'll purposely break the delta chain and fix the
* unsigned columns in the database.
* */
this.shouldSnapshotNextSchema = true;
}
}

private void fixColumnCases(Schema recaptured) throws SQLException {
int caseDiffs = 0;

for ( Pair<ColumnDef, ColumnDef> pair : schema.matchColumns(recaptured) ) {
ColumnDef cA = pair.getLeft();
ColumnDef cB = pair.getRight();

if ( !cA.getName().equals(cB.getName()) ) {
LOGGER.info("correcting column case of `" + cA.getName() + "` to `" + cB.getName() + "`. Will save a full schema snapshot after the new DDL update is processed.");
caseDiffs++;
cA.setName(cB.getName());
}
}

if ( caseDiffs > 0 )
this.shouldSnapshotNextSchema = true;
}

protected void handleVersionUpgrades(Connection conn) throws SQLException, InvalidSchemaError {
if ( this.schemaVersion < 1 ) {
if ( this.schema != null && this.schema.findDatabase("mysql") == null ) {
LOGGER.info("Could not find mysql db, adding it to schema");
SchemaCapturer sc = new SchemaCapturer(conn, sensitivity, "mysql");
Database db = sc.capture().findDatabase("mysql");
this.schema.addDatabase(db);
this.shouldSnapshotNextSchema = true;
if ( this.schemaVersion < 2 ) {
Schema recaptured = new SchemaCapturer(conn, sensitivity).capture();

if ( this.schemaVersion < 1 ) {
if ( this.schema != null && this.schema.findDatabase("mysql") == null ) {
LOGGER.info("Could not find mysql db, adding it to schema");
SchemaCapturer sc = new SchemaCapturer(conn, sensitivity, "mysql");
Database db = sc.capture().findDatabase("mysql");
this.schema.addDatabase(db);
this.shouldSnapshotNextSchema = true;
}

fixUnsignedColumns(recaptured);
}

fixUnsignedColumns(conn);
if ( this.schemaVersion < 2 ) {
fixColumnCases(recaptured);
}
}
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/zendesk/maxwell/schema/Schema.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.zendesk.maxwell.schema;

import com.zendesk.maxwell.CaseSensitivity;
import com.zendesk.maxwell.schema.columndef.ColumnDef;
import com.zendesk.maxwell.schema.ddl.InvalidSchemaError;
import org.apache.commons.lang3.tuple.Pair;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -91,4 +93,29 @@ public String getCharset() {
public CaseSensitivity getCaseSensitivity() {
return sensitivity;
};

public List<Pair<ColumnDef, ColumnDef>> matchColumns(Schema thatSchema) {
ArrayList<Pair<ColumnDef, ColumnDef>> list = new ArrayList<>();

for ( Database thisDatabase : this.getDatabases() ) {
Database thatDatabase = thatSchema.findDatabase(thisDatabase.getName());

if ( thatDatabase == null )
continue;

for ( Table thisTable : thisDatabase.getTableList() ) {
Table thatTable = thatDatabase.findTable(thisTable.getName());

if ( thatTable == null )
continue;

for ( ColumnDef thisColumn : thisTable.getColumnList() ) {
ColumnDef thatColumn = thatTable.findColumn(thisColumn.getName());
if ( thatColumn != null )
list.add(Pair.of(thisColumn, thatColumn));
}
}
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.regex.Pattern;

import com.zendesk.maxwell.CaseSensitivity;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.io.BufferedReader;
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
62 changes: 47 additions & 15 deletions src/main/java/com/zendesk/maxwell/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.zendesk.maxwell.schema.columndef.IntColumnDef;
import com.zendesk.maxwell.schema.columndef.BigIntColumnDef;
import com.zendesk.maxwell.schema.columndef.EnumeratedColumnDef;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

import com.zendesk.maxwell.schema.columndef.ColumnDef;
import com.zendesk.maxwell.schema.columndef.StringColumnDef;
Expand All @@ -22,6 +22,7 @@ public class Table {
private List<ColumnDef> columnList;
public String charset;
private List<String> pkColumnNames;
private List<String> normalizedPKColumnNames;

private HashMap<String, Integer> columnOffsetMap;
@JsonIgnore
Expand Down Expand Up @@ -73,7 +74,7 @@ private void initColumnOffsetMap() {
int i = 0;

for(ColumnDef c : columnList) {
this.columnOffsetMap.put(c.getName(), i++);
this.columnOffsetMap.put(c.getName().toLowerCase(), i++);
}
}

Expand All @@ -89,14 +90,11 @@ public int findColumnIndex(String name) {
}

public ColumnDef findColumn(String name) {
String lcName = name.toLowerCase();

for (ColumnDef c : columnList ) {
if ( c.getName().equals(lcName) )
return c;
}

return null;
int index = findColumnIndex(name);
if ( index == -1 )
return null;
else
return columnList.get(index);
}

public ColumnDef findColumnOrThrow(String name) throws InvalidSchemaError {
Expand Down Expand Up @@ -260,8 +258,11 @@ public void addColumn(ColumnDef defintion) {
}

public void removeColumn(int idx) {
removePKColumn(columnList.get(idx).getName());

this.columnList.remove(idx);
this.columnOffsetMap = null;

renumberColumns();
}

Expand All @@ -275,7 +276,7 @@ public String getCharset() {

@JsonProperty("primary-key")
public List<String> getPKList() {
return this.pkColumnNames;
return normalizedColumnNames();
}

@JsonIgnore
Expand All @@ -287,9 +288,40 @@ public String getPKString() {
}

@JsonProperty("primary-key")
public void setPKList(List<String> pkColumnNames) {
this.pkColumnNames = new ArrayList<>();
for ( String c : pkColumnNames )
this.pkColumnNames.add(c.toLowerCase());
public synchronized void setPKList(List<String> pkColumnNames) {
this.pkColumnNames = pkColumnNames;
this.normalizedPKColumnNames = null;
}

private synchronized void removePKColumn(String name) {
int pkIndex = getPKList().indexOf(name);
if ( pkIndex != -1 ) {
this.pkColumnNames.remove(pkIndex);
this.normalizedPKColumnNames = null;
}
}

private synchronized List<String> normalizedColumnNames() {
/*
primary keys may come in with different casing than the column names.
convert the list of primary keys to match the column casing.
we do this normalization lazily, as when a Table object is being deserialized
from JSON, there may be no column definitions present when the setPKList() function is called.
ugly!
*/
if ( this.normalizedPKColumnNames == null ) {
this.normalizedPKColumnNames = new ArrayList<>(this.pkColumnNames.size());

for (String name : pkColumnNames) {
ColumnDef cd = findColumn(name);

if ( cd == null )
throw new RuntimeException("Couldn't find column for primary-key: " + name);

this.normalizedPKColumnNames.add(cd.getName());
}
}
return this.normalizedPKColumnNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public abstract class ColumnDef {

public ColumnDef() { }
public ColumnDef(String name, String type, int pos) {
this.name = name.toLowerCase();
this.name = name;
this.type = type;
this.pos = pos;
}
Expand Down Expand Up @@ -165,6 +165,10 @@ else if ( columnLength < ( 1 << 24) )
}
}

public void setName(String name) {
this.name = name;
}

public String getName() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.ArrayList;
import java.util.Arrays;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

import com.google.code.or.common.util.MySQLConstants;

Expand Down
Loading

0 comments on commit d1e8b45

Please sign in to comment.