diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 8cc6e1970aff9..6466ee459cedb 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -515,22 +515,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er // ReadFile reads the file from the storage and returns the contents. func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) { - input := &s3.GetObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + file), - } - result, err := rs.svc.GetObjectWithContext(ctx, input) - if err != nil { - return nil, errors.Annotatef(err, - "failed to read s3 file, file info: input.bucket='%s', input.key='%s'", - *input.Bucket, *input.Key) - } - defer result.Body.Close() - data, err := io.ReadAll(result.Body) - if err != nil { - return nil, errors.Trace(err) + var ( + data []byte + readErr error + ) + for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 { + input := &s3.GetObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + result, err := rs.svc.GetObjectWithContext(ctx, input) + if err != nil { + return nil, errors.Annotatef(err, + "failed to read s3 file, file info: input.bucket='%s', input.key='%s'", + *input.Bucket, *input.Key) + } + data, readErr = io.ReadAll(result.Body) + // close the body of response since data has been already read out + result.Body.Close() + // for unit test + failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) { + log.Info("original error", zap.Error(readErr)) + readErr = errors.Errorf("read: connection reset by peer") + }) + if readErr != nil { + if isDeadlineExceedError(readErr) || isCancelError(readErr) { + return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'", + *input.Bucket, *input.Key, retryCnt) + } + continue + } + return data, nil } - return data, nil + // retry too much, should be failed + return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'", + rs.options.Bucket, rs.options.Prefix+file) } // DeleteFile delete the file in s3 storage @@ -989,6 +1008,10 @@ type retryerWithLog struct { client.DefaultRetryer } +func isCancelError(err error) bool { + return strings.Contains(err.Error(), "context canceled") +} + func isDeadlineExceedError(err error) bool { // TODO find a better way. // Known challenges: diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 52468dbe32a9b..f84d6d9d32a1c 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "sync" "testing" @@ -1343,3 +1344,40 @@ func TestRetryError(t *testing.T) { require.NoError(t, err) require.Equal(t, count, int32(2)) } + +func TestS3ReadFileRetryable(t *testing.T) { + s := createS3Suite(t) + ctx := aws.BackgroundContext() + errMsg := "just some unrelated error" + expectedErr := errors.New(errMsg) + + s.s3.EXPECT(). + GetObjectWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) { + require.Equal(t, "bucket", aws.StringValue(input.Bucket)) + require.Equal(t, "prefix/file", aws.StringValue(input.Key)) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("test"))), + }, nil + }) + s.s3.EXPECT(). + GetObjectWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) { + require.Equal(t, "bucket", aws.StringValue(input.Bucket)) + require.Equal(t, "prefix/file", aws.StringValue(input.Key)) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("test"))), + }, nil + }) + s.s3.EXPECT(). + GetObjectWithContext(ctx, gomock.Any()). + Return(nil, expectedErr) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)")) + defer func() { + failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed") + }() + _, err := s.storage.ReadFile(ctx, "file") + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), errMsg)) +} diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index a47c472a356d2..103dfecb592b9 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -89,7 +89,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 33, + shard_count = 34, deps = [ "//br/pkg/errors", "//br/pkg/metautil",