Skip to content

Commit 5854f05

Browse files
committed
fix issue alibaba#4459 meta_history过滤(alibaba#4459)
1 parent aada80b commit 5854f05

File tree

1 file changed

+20
-21
lines changed

1 file changed

+20
-21
lines changed

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

+20-21
Original file line numberDiff line numberDiff line change
@@ -247,34 +247,33 @@ private boolean dumpTableMeta(MysqlConnection connection, final CanalEventFilter
247247
}
248248

249249
private boolean applyHistoryToDB(EntryPosition position, String schema, String ddl, String extra) {
250-
Map<String, String> content = new HashMap<>();
251-
content.put("destination", destination);
252-
content.put("binlogFile", position.getJournalName());
253-
content.put("binlogOffest", String.valueOf(position.getPosition()));
254-
content.put("binlogMasterId", String.valueOf(position.getServerId()));
255-
content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
256-
content.put("useSchema", schema);
257-
if (content.isEmpty()) {
258-
throw new RuntimeException("apply failed caused by content is empty in applyHistoryToDB");
259-
}
250+
MetaHistoryDO metaDO = new MetaHistoryDO();
251+
metaDO.setDestination(destination);
252+
metaDO.setBinlogFile(position.getJournalName());
253+
metaDO.setBinlogOffest(position.getPosition());
254+
metaDO.setBinlogMasterId(String.valueOf(position.getServerId()));
255+
metaDO.setBinlogTimestamp(position.getTimestamp());
256+
metaDO.setUseSchema(schema);
260257
// 待补充
261258
List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
262259
if (ddlResults.size() > 0) {
263260
DdlResult ddlResult = ddlResults.get(0);
264-
content.put("sqlSchema", ddlResult.getSchemaName());
265-
content.put("sqlTable", ddlResult.getTableName());
266-
content.put("sqlType", ddlResult.getType().name());
267-
content.put("sqlText", ddl);
268-
content.put("extra", extra);
261+
metaDO.setSqlSchema(ddlResult.getSchemaName());
262+
metaDO.setSqlTable(ddlResult.getTableName());
263+
metaDO.setSqlType(ddlResult.getType().name());
264+
metaDO.setSqlText(ddl);
265+
metaDO.setExtra(extra);
269266
}
270267

271-
MetaHistoryDO metaDO = new MetaHistoryDO();
272268
try {
273-
BeanUtils.populate(metaDO, content);
274-
// 会建立唯一约束,解决:
275-
// 1. 重复的binlog file+offest
276-
// 2. 重复的masterId+timestamp
277-
metaHistoryDAO.insert(metaDO);
269+
// Fix issue #4459
270+
String name = metaDO.getUseSchema() + "." + metaDO.getSqlTable();
271+
if (blackFilter == null || !blackFilter.filter(name)) {
272+
// 会建立唯一约束,解决:
273+
// 1. 重复的binlog file+offset
274+
// 2. 重复的masterId+timestamp
275+
metaHistoryDAO.insert(metaDO);
276+
}
278277
} catch (Throwable e) {
279278
if (isUkDuplicateException(e)) {
280279
// 忽略掉重复的位点

0 commit comments

Comments
 (0)