Skip to content

Commit

Permalink
Merge branch 'ben/output_tx_id' into v0_11
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Osheroff committed Sep 2, 2015
2 parents ab8c427 + f64d60c commit 50873eb
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 61 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ config/*.yml
/target
/config.properties
/maxwell.position
/.idea
/*.iml
35 changes: 34 additions & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
public abstract class MaxwellAbstractRowsEvent extends AbstractRowEvent {
private final MaxwellFilter filter;
private final AbstractRowEvent event;

private Long xid;
private boolean txCommit; // whether this row ends the transaction

protected final Table table;
protected final Database database;

Expand All @@ -37,6 +41,8 @@ public MaxwellAbstractRowsEvent(AbstractRowEvent e, Table table, MaxwellFilter f
this.header = e.getHeader();
this.table = table;
this.database = table.getDatabase();
this.txCommit = false;
this.xid = null;
this.filter = f;
}

Expand Down Expand Up @@ -84,6 +90,19 @@ public Column findColumn(String name, Row r) {
return null;
}

public Long getXid() {
return xid;
}

public void setXid(Long xid) {
this.xid = xid;
}

public void setTXCommit() {
this.txCommit = true;
}


@Override
public String toString() {
return event.toString();
Expand Down Expand Up @@ -197,6 +216,14 @@ public void setTimestamp(Long l) {
this.put("ts", l);
}

public void setXid(Long xid) {
this.put("xid", xid);
}

public void setTXCommit() {
this.put("commit", true);
}

public Object getData(String string) {
return this.data.get(string);
}
Expand All @@ -205,13 +232,19 @@ public Object getData(String string) {
public List<RowMap> jsonMaps() {
ArrayList<RowMap> list = new ArrayList<>();
Object value;
for ( Row r : filteredRows()) {
for ( Iterator<Row> ri = filteredRows().iterator() ; ri.hasNext(); ) {
Row r = ri.next();
RowMap rowMap = new RowMap();

rowMap.setRowType(getType());
rowMap.setTable(getTable().getName());
rowMap.setDatabase(getDatabase().getName());
rowMap.setTimestamp(getHeader().getTimestamp() / 1000);
rowMap.setXid(getXid());

// only set commit: true on the last row of the last event of the transaction
if ( this.txCommit && !ri.hasNext() )
rowMap.setTXCommit();

Iterator<Column> colIter = r.getColumns().iterator();
Iterator<ColumnDef> defIter = table.getColumnList().iterator();
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellJSONObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public Set<String> keySet() {
set.add("data");
set.add("ts");

if ( has("xid") )
set.add("xid");

if ( has("commit") && getBoolean("commit"))
set.add("commit");

return set;
}
}
160 changes: 102 additions & 58 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,20 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.google.code.or.binlog.impl.event.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.code.or.OpenReplicator;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
import com.google.code.or.binlog.impl.event.AbstractRowEvent;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
import com.google.code.or.common.util.MySQLConstants;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.schema.Schema;
Expand Down Expand Up @@ -133,71 +126,122 @@ private MaxwellAbstractRowsEvent processRowsEvent(AbstractRowEvent e) throws Sch

table = tableCache.getTable(e.getTableId());

if ( table == null ) {
if (table == null) {
throw new SchemaSyncError("couldn't find table in cache for table id: " + e.getTableId());
}

switch (e.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
ew = new MaxwellWriteRowsEvent((WriteRowsEvent) e, table, filter);
break;
case MySQLConstants.WRITE_ROWS_EVENT_V2:
ew = new MaxwellWriteRowsEvent((WriteRowsEventV2) e, table, filter);
break;
case MySQLConstants.UPDATE_ROWS_EVENT:
ew = new MaxwellUpdateRowsEvent((UpdateRowsEvent) e, table, filter);
break;
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
ew = new MaxwellUpdateRowsEvent((UpdateRowsEventV2) e, table, filter);
break;
case MySQLConstants.DELETE_ROWS_EVENT:
ew = new MaxwellDeleteRowsEvent((DeleteRowsEvent) e, table, filter);
break;
case MySQLConstants.DELETE_ROWS_EVENT_V2:
ew = new MaxwellDeleteRowsEvent((DeleteRowsEventV2) e, table, filter);
break;
default:
return null;
}
return ew;

}

public MaxwellAbstractRowsEvent getEvent(boolean stopAtNextTableMap) throws Exception {
BinlogEventV4 v4Event;
MaxwellAbstractRowsEvent event;
while (true) {
v4Event = queue.poll(100, TimeUnit.MILLISECONDS);

if ( v4Event == null )
return null;

switch(v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
ew = new MaxwellWriteRowsEvent((WriteRowsEvent) e, table, filter);
break;
case MySQLConstants.WRITE_ROWS_EVENT_V2:
ew = new MaxwellWriteRowsEvent((WriteRowsEventV2) e, table, filter);
break;
case MySQLConstants.UPDATE_ROWS_EVENT:
ew = new MaxwellUpdateRowsEvent((UpdateRowsEvent) e, table, filter);
break;
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
ew = new MaxwellUpdateRowsEvent((UpdateRowsEventV2) e, table, filter);
break;
case MySQLConstants.DELETE_ROWS_EVENT:
ew = new MaxwellDeleteRowsEvent((DeleteRowsEvent) e, table, filter);
break;
case MySQLConstants.DELETE_ROWS_EVENT_V2:
rowEventsProcessed++;
event = processRowsEvent((AbstractRowEvent) v4Event);
if ( event.matchesFilter() )
return event;
ew = new MaxwellDeleteRowsEvent((DeleteRowsEventV2) e, table, filter);
break;
case MySQLConstants.TABLE_MAP_EVENT:
if ( stopAtNextTableMap)
return null;
default:
return null;
}
return ew;
}

tableCache.processEvent(this.schema, (TableMapEvent) v4Event);
break;
case MySQLConstants.QUERY_EVENT:
processQueryEvent((QueryEvent) v4Event);
private LinkedList<MaxwellAbstractRowsEvent> getTransactionEvents() throws Exception {
BinlogEventV4 v4Event;
MaxwellAbstractRowsEvent event;

LinkedList<MaxwellAbstractRowsEvent> list = new LinkedList<>();

// currently to satisfy the test interface, the contract is to return null
// if the queue is empty. should probably just replace this with an optional timeout...

while ( true ) {
v4Event = queue.poll(100, TimeUnit.MILLISECONDS);
if (v4Event == null)
continue;

switch(v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
case MySQLConstants.WRITE_ROWS_EVENT_V2:
case MySQLConstants.UPDATE_ROWS_EVENT:
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
case MySQLConstants.DELETE_ROWS_EVENT:
case MySQLConstants.DELETE_ROWS_EVENT_V2:
rowEventsProcessed++;
event = processRowsEvent((AbstractRowEvent) v4Event);

if ( event.matchesFilter() )
list.add(event);
break;
case MySQLConstants.TABLE_MAP_EVENT:
tableCache.processEvent(this.schema, (TableMapEvent) v4Event);
break;
case MySQLConstants.QUERY_EVENT:
QueryEvent qe = (QueryEvent) v4Event;
// TODO: need to handle this case.
// The MySQL guys say some storage engines will output a "COMMIT" QUERY_EVENT at the
// end of the stream.
throw new RuntimeException("Unhandled QueryEvent: " + qe);
case MySQLConstants.XID_EVENT:
XidEvent xe = (XidEvent) v4Event;
for ( MaxwellAbstractRowsEvent e : list )
e.setXid(xe.getXid());

if ( !list.isEmpty() )
list.getLast().setTXCommit();

return list;
}
}
}

private LinkedList<MaxwellAbstractRowsEvent> txBuffer;

public MaxwellAbstractRowsEvent getEvent() throws Exception {
return getEvent(false);
BinlogEventV4 v4Event;
MaxwellAbstractRowsEvent event;

while ( true ) {
if ( txBuffer != null && !txBuffer.isEmpty() ) {
return txBuffer.removeFirst();
}

v4Event = queue.poll(100, TimeUnit.MILLISECONDS);

if (v4Event == null) return null;

switch (v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
case MySQLConstants.WRITE_ROWS_EVENT_V2:
case MySQLConstants.UPDATE_ROWS_EVENT:
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
case MySQLConstants.DELETE_ROWS_EVENT:
case MySQLConstants.DELETE_ROWS_EVENT_V2:
LOGGER.error("Got an unexpected row-event: " + v4Event);
break;
case MySQLConstants.TABLE_MAP_EVENT:
tableCache.processEvent(this.schema, (TableMapEvent) v4Event);
break;
case MySQLConstants.QUERY_EVENT:
QueryEvent qe = (QueryEvent) v4Event;
if (qe.getSql().toString().equals("BEGIN"))
txBuffer = getTransactionEvents();
else
processQueryEvent((QueryEvent) v4Event);
break;
default:
break;
}
}
}

private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLException, IOException {
Expand Down
1 change: 1 addition & 0 deletions src/test/java/com/zendesk/maxwell/AbstractMaxwellTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class AbstractMaxwellTest {
public static void setUpBeforeClass() throws Exception {
server = new MysqlIsolatedServer();
server.boot();

SchemaStore.ensureMaxwellSchema(server.getConnection());
}

Expand Down
47 changes: 45 additions & 2 deletions src/test/java/com/zendesk/maxwell/MaxwellIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.zendesk.maxwell;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.*;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -149,8 +150,8 @@ public void testExcludeTable() throws Exception {
"insert into minimal set account_id = 2",
"ALTER table minimal add column new_text_field varchar(255)",
"insert into minimal set account_id = 2, new_text_field='hihihi'",

};

@Test
public void testAlterTable() throws Exception {
MaxwellAbstractRowsEvent e;
Expand All @@ -162,6 +163,46 @@ public void testAlterTable() throws Exception {
assertThat(e.getTable().getName(), is("minimal"));
}

String testTransactions[] = {
"BEGIN",
"insert into minimal set account_id = 1, text_field = 's'",
"insert into minimal set account_id = 2, text_field = 's'",
"COMMIT",
"BEGIN",
"insert into minimal (account_id, text_field) values (3, 's'), (4, 's')",
"COMMIT"
};

@Test
public void testTransactionID() throws Exception {
List<MaxwellAbstractRowsEvent> list;

try {
server.getConnection().setAutoCommit(false);
list = getRowsForSQL(null, testTransactions, null);

ArrayList<JSONObject> objects = new ArrayList<>();
for (MaxwellAbstractRowsEvent e : list) {
for (JSONObject j : e.toJSONObjects()) {
assertTrue(j.has("xid"));
objects.add(j);
}
}
assertEquals(4, objects.size());

assertEquals(objects.get(0).get("xid"), objects.get(1).get("xid"));
assertFalse(objects.get(0).has("commit"));
assertTrue(objects.get(1).has("commit"));

assertFalse(objects.get(2).has("commit"));
assertTrue(objects.get(3).has("commit"));
} finally {
server.getConnection().setAutoCommit(true);
}
}



private void runJSONTest(List<String> sql, List<JSONObject> assertJSON) throws Exception {
List<JSONObject> eventJSON = new ArrayList<>();
List<JSONObject> matched = new ArrayList<>();
Expand All @@ -172,6 +213,8 @@ private void runJSONTest(List<String> sql, List<JSONObject> assertJSON) throws E
// undo maxwell's fancy ordering stuff -- it's preventing us from removing the ts column.
a = new JSONObject(a.toString());
a.remove("ts");
a.remove("xid");
a.remove("commit");

eventJSON.add(a);

Expand Down

0 comments on commit 50873eb

Please sign in to comment.