Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into fixGTIDPurged
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Sep 7, 2021
2 parents ed11bca + dcb2e2b commit 865e978
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 291 deletions.
281 changes: 142 additions & 139 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

25 changes: 13 additions & 12 deletions dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ message QueryStatusResponse {
// a (sub) task should be always in one stage of the following stages
// (sub) task can transfer from on stage to some special other stages
// New: initial stage when a sub task is created
// can not transfered from other stages
// transfer to Running when initialize with no error
// can not be transferred from other stages
// transfers to Running when initialize with no error
// Running: indicates the sub task is processing
// transfered from New when created successfully
// transfered from Paused when resuming is requested
// transfer to Paused when error occured or requested from external
// transfer to Stopped when requested from external
// transfer to Finished when sub task processing completed (no Syncer used)
// is transferred from New when created successfully
// is transferred from Paused when resuming is requested
// transfers to Paused when error occurred or requested from external
// transfers to Stopped when requested from external
// transfers to Finished when sub task processing completed (no Syncer used)
// Paused: indicates the processing is paused, and can be resume from external request
// transfered from Running when error occured or requested from external
// transfer to Running when resuming is requested from external
// transfer to Stopped when requested from external
// is transferred from Running when error occurred or requested from external
// transfers to Running when resuming is requested from external
// transfers to Stopped when requested from external
// Stopped: indicates the processing is stopped, and can not be resume (or re-run) again
// transfered from Running / Paused when requested from external
// is transferred from Running / Paused when requested from external
// can not transfer to any stages
// Finished: indicates the processing is finished, and no need to re-run
// transfered from Running when processing completed
// is transferred from Running when processing completed
// should not transfer to any stages
enum Stage {
InvalidStage = 0; // placeholder
Expand All @@ -83,6 +83,7 @@ enum Stage {

Pausing = 6;
Resuming = 7;
Stopping = 8;
}

// CheckStatus represents status for check unit
Expand Down
20 changes: 13 additions & 7 deletions dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,29 @@ const (
DefaultInitTimeout = time.Minute
)

// Unit defines interface for sub task process units, like syncer, loader, relay, etc.
// Unit defines interface for subtask process units, like syncer, loader, relay, etc.
// The Unit is not responsible to maintain its status like "pausing"/"paused". The caller should maintain the status,
// for example, know the Unit is "paused" and avoid call Pause again.
// All method is Unit interface can expect no concurrent invocation, the caller should guarantee this.
type Unit interface {
// Init initializes the dm process unit
// every unit does base initialization in `Init`, and this must pass before start running the sub task
// other setups can be done in `Process`, but this should be treated carefully, let it's compatible with Pause / Resume
// every unit does base initialization in `Init`, and this must pass before start running the subtask
// other setups can be done in the beginning of `Process`, but this should be treated carefully to make it
// compatible with Pause / Resume.
// if initialing successfully, the outer caller should call `Close` when the unit (or the task) finished, stopped or canceled (because other units Init fail).
// if initialing fail, Init itself should release resources it acquired before (rolling itself back).
Init(ctx context.Context) error
// Process processes sub task
// When ctx.Done, stops the process and returns
// Process does the main logic and its returning must send a result to pr channel.
// When ctx.Done, stops the process and returns, otherwise the DM-worker will be blocked forever
// When not in processing, call Process to continue or resume the process
Process(ctx context.Context, pr chan pb.ProcessResult)
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// Pause pauses the process, it can be resumed later
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
// Resume resumes the paused process
// Resume resumes the paused process and its returning must send a result to pr channel.
Resume(ctx context.Context, pr chan pb.ProcessResult)
// Update updates the configuration
Update(cfg *config.SubTaskConfig) error
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (t *testServer) TestHandleSourceBoundAfterError(c *C) {
return false
}), IsTrue)

// enable failpont
// enable failpoint
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg", `return(true)`), IsNil)
sourceCfg := loadSourceConfigWithoutPassword(c)
sourceCfg.EnableRelay = false
Expand Down
Loading

0 comments on commit 865e978

Please sign in to comment.