Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) #50541

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 13 additions & 48 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,57 +1437,22 @@ func SendBackup(
var errReset error
var errBackup error

for retry := 0; retry < backupRetryTimes; retry++ {
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
retry := -1
return utils.WithRetry(ctx, func() error {
retry += 1
if retry != 0 {
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
}
logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry))
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
if isRetryableError(errBackup) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
continue
}
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
}
// finish backup
break
}
return nil
}

// gRPC communication cancelled with connection closing
const (
gRPC_Cancel = "the client connection is closing"
)

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
}
return false
return nil
}, utils.NewBackupSSTBackoffer())
}
53 changes: 38 additions & 15 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,22 +559,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er

// ReadFile reads the file from the storage and returns the contents.
func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, errors.Trace(err)
var (
data []byte
readErr error
)
for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
data, readErr = io.ReadAll(result.Body)
// close the body of response since data has been already read out
result.Body.Close()
// for unit test
failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) {
log.Info("original error", zap.Error(readErr))
readErr = errors.Errorf("read: connection reset by peer")
})
if readErr != nil {
if isDeadlineExceedError(readErr) || isCancelError(readErr) {
return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'",
*input.Bucket, *input.Key, retryCnt)
}
continue
}
return data, nil
}
return data, nil
// retry too much, should be failed
return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'",
rs.options.Bucket, rs.options.Prefix+file)
}

// DeleteFile delete the file in s3 storage
Expand Down Expand Up @@ -1104,6 +1123,10 @@ type retryerWithLog struct {
client.DefaultRetryer
}

func isCancelError(err error) bool {
return strings.Contains(err.Error(), "context canceled")
}

func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -1383,3 +1384,40 @@ func TestRetryError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, count, int32(2))
}

func TestS3ReadFileRetryable(t *testing.T) {
s := createS3Suite(t)
ctx := aws.BackgroundContext()
errMsg := "just some unrelated error"
expectedErr := errors.New(errMsg)

s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
Return(nil, expectedErr)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed")
}()
_, err := s.storage.ReadFile(ctx, "file")
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errMsg))
}
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 33,
shard_count = 34,
deps = [
"//br/pkg/errors",
"//pkg/kv",
Expand Down
33 changes: 32 additions & 1 deletion br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"io"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -26,6 +27,10 @@ const (
downloadSSTWaitInterval = 1 * time.Second
downloadSSTMaxWaitInterval = 4 * time.Second

backupSSTRetryTimes = 5
backupSSTWaitInterval = 2 * time.Second
backupSSTMaxWaitInterval = 3 * time.Second

resetTSRetryTime = 16
resetTSWaitInterval = 50 * time.Millisecond
resetTSMaxWaitInterval = 500 * time.Millisecond
Expand All @@ -42,8 +47,21 @@ const (
ChecksumRetryTime = 8
ChecksumWaitInterval = 1 * time.Second
ChecksumMaxWaitInterval = 30 * time.Second

gRPC_Cancel = "the client connection is closing"
)

// At least, there are two possible cancel() call,
// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing
func isGRPCCancel(err error) bool {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
return true
}
}
return false
}

// RetryState is the mutable state needed for retrying.
// It likes the `utils.Backoffer`, but more fundamental:
// this only control the backoff time and knows nothing about what error happens.
Expand Down Expand Up @@ -143,6 +161,11 @@ func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}

func NewBackupSSTBackoffer() Backoffer {
errContext := NewErrorContext("backup sst", 3)
return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
// we don't care storeID here.
Expand All @@ -162,9 +185,17 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.attempt = 0
default:
switch status.Code(e) {
case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded:
case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case codes.Canceled:
if isGRPCCancel(err) {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
bo.delayTime = 0
bo.attempt = 0
}
default:
// Unexpected error
bo.delayTime = 0
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,22 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
context.Canceled,
}, multierr.Errors(err))
}

func TestNewBackupSSTBackofferWithCancel(t *testing.T) {
var counter int
backoffer := utils.NewBackupSSTBackoffer()
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
if counter == 3 {
return context.Canceled
}
return berrors.ErrKVIngestFailed
}, backoffer)
require.Equal(t, 4, counter)
require.Equal(t, []error{
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
context.Canceled,
}, multierr.Errors(err))
}
1 change: 1 addition & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var retryableServerError = []string{
"body write aborted",
"error during dispatch",
"put object timeout",
"timeout after",
"internalerror",
"not read from or written to within the timeout period",
"<code>requesttimeout</code>",
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')
Expand Down