Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into fixSchemaFlush
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Jun 30, 2021
2 parents 4cb0f7b + 50841fb commit ec2b9e9
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 9 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ ErrLoadUnitDumpDirNotFound,[code=34014:class=load-unit:scope=internal:level=high
ErrLoadUnitDuplicateTableFile,[code=34015:class=load-unit:scope=internal:level=high], "Message: invalid table schema file, duplicated item - %s"
ErrLoadUnitGenBAList,[code=34016:class=load-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrLoadTaskWorkerNotMatch,[code=34017:class=functional:scope=internal:level=high], "Message: different worker in load stage, previous worker: %s, current worker: %s, Workaround: Please check if the previous worker is online."
ErrLoadTaskCheckPointNotMatch,[code=34018:class=functional:scope=internal:level=high], "Message: inconsistent checkpoints between loader and target database, Workaround: If you want to redo the whole task, please check that you have not forgotten to add -remove-meta flag for start-task command."
ErrSyncerUnitPanic,[code=36001:class=sync-unit:scope=internal:level=high], "Message: panic error: %v"
ErrSyncUnitInvalidTableName,[code=36002:class=sync-unit:scope=internal:level=high], "Message: extract table name for DML error: %s"
ErrSyncUnitTableNameQuery,[code=36003:class=sync-unit:scope=internal:level=high], "Message: table name parse error: %s"
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,12 @@ description = ""
workaround = "Please check if the previous worker is online."
tags = ["internal", "high"]

[error.DM-functional-34018]
message = "inconsistent checkpoints between loader and target database"
description = ""
workaround = "If you want to redo the whole task, please check that you have not forgotten to add -remove-meta flag for start-task command."
tags = ["internal", "high"]

[error.DM-sync-unit-36001]
message = "panic error: %v"
description = ""
Expand Down
18 changes: 13 additions & 5 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type CheckPoint interface {

// UpdateOffset keeps `cp.restoringFiles` in memory same with checkpoint in DB,
// should be called after update checkpoint in DB
UpdateOffset(filename string, offset int64)
UpdateOffset(filename string, offset int64) error

// AllFinished returns `true` when all restoring job are finished
AllFinished() bool
Expand Down Expand Up @@ -395,15 +395,23 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
}

// UpdateOffset implements CheckPoint.UpdateOffset.
func (cp *RemoteCheckPoint) UpdateOffset(filename string, offset int64) {
func (cp *RemoteCheckPoint) UpdateOffset(filename string, offset int64) error {
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
db, table, err := getDBAndTableFromFilename(filename)
if err != nil {
cp.logger.Error("error in checkpoint UpdateOffset", zap.Error(err))
return
return terror.Annotatef(terror.ErrLoadTaskCheckPointNotMatch.Generate(err), "wrong filename=%s", filename)
}
cp.restoringFiles.pos[db][table][filename][0] = offset

if _, ok := cp.restoringFiles.pos[db]; ok {
if _, ok := cp.restoringFiles.pos[db][table]; ok {
if _, ok := cp.restoringFiles.pos[db][table][filename]; ok {
cp.restoringFiles.pos[db][table][filename][0] = offset
return nil
}
}
}
return terror.ErrLoadTaskCheckPointNotMatch.Generatef("db=%s table=%s not in checkpoint", db, filename)
}

// Clear implements CheckPoint.Clear.
Expand Down
10 changes: 9 additions & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,15 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
})
continue
}
w.loader.checkPoint.UpdateOffset(job.file, job.offset)

failpoint.Inject("loaderCPUpdateOffsetError", func(_ failpoint.Value) {
job.file = "notafile" + job.file
})
if err := w.loader.checkPoint.UpdateOffset(job.file, job.offset); err != nil {
runFatalChan <- unit.NewProcessError(err)
hasError = true
continue
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ const (
codeLoadUnitDuplicateTableFile
codeLoadUnitGenBAList
codeLoadTaskWorkerNotMatch
codeLoadCheckPointNotMatch
)

// Sync unit error code.
Expand Down Expand Up @@ -943,6 +944,7 @@ var (
ErrLoadUnitDuplicateTableFile = New(codeLoadUnitDuplicateTableFile, ClassLoadUnit, ScopeInternal, LevelHigh, "invalid table schema file, duplicated item - %s", "")
ErrLoadUnitGenBAList = New(codeLoadUnitGenBAList, ClassLoadUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.")
ErrLoadTaskWorkerNotMatch = New(codeLoadTaskWorkerNotMatch, ClassFunctional, ScopeInternal, LevelHigh, "different worker in load stage, previous worker: %s, current worker: %s", "Please check if the previous worker is online.")
ErrLoadTaskCheckPointNotMatch = New(codeLoadCheckPointNotMatch, ClassFunctional, ScopeInternal, LevelHigh, "inconsistent checkpoints between loader and target database", "If you want to redo the whole task, please check that you have not forgotten to add -remove-meta flag for start-task command.")

// Sync unit error.
ErrSyncerUnitPanic = New(codeSyncerUnitPanic, ClassSyncUnit, ScopeInternal, LevelHigh, "panic error: %v", "")
Expand Down
37 changes: 34 additions & 3 deletions tests/load_interrupt/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,43 @@ function check_row_count() {
[ "$estimate" == "$row_count" ]
}

function run() {
THRESHOLD=1024
function test_save_checkpoint_faild() {
prepare_datafile

run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

export GO_FAILPOINTS="github.com/pingcap/dm/loader/loaderCPUpdateOffsetError=return()"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task.yaml"

# load task should Paused because job file is not right
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Paused" 1

# check dump files are generated before worker down
ls $WORK_DIR/worker1/dumped_data.test

echo "test_save_checkpoint_faild SUCCESS!"
cleanup_data load_interrupt
cleanup_process $*
}

function run() {
test_save_checkpoint_faild

prepare_datafile
run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
THRESHOLD=1024
export GO_FAILPOINTS="github.com/pingcap/dm/loader/LoadExceedOffsetExit=return($THRESHOLD)"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
Expand Down

0 comments on commit ec2b9e9

Please sign in to comment.