Skip to content

Commit ab3f42c

Browse files
authored
Merge pull request #1832 from mtrmac/eof-range-requests-5.24
[release-5.24] Reconnecting blob reader
2 parents 109c0b0 + f35b37f commit ab3f42c

File tree

4 files changed

+365
-4
lines changed

4 files changed

+365
-4
lines changed

docker/body_reader.go

+234
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package docker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"math"
9+
"math/rand"
10+
"net/http"
11+
"net/url"
12+
"strconv"
13+
"strings"
14+
"syscall"
15+
"time"
16+
17+
"github.com/sirupsen/logrus"
18+
)
19+
20+
// bodyReaderMinimumProgress is the minimum progress we want to see before we retry
21+
const bodyReaderMinimumProgress = 1 * 1024 * 1024
22+
23+
// bodyReader is an io.ReadCloser returned by dockerImageSource.GetBlob,
24+
// which can transparently resume some (very limited) kinds of aborted connections.
25+
type bodyReader struct {
26+
ctx context.Context
27+
c *dockerClient
28+
29+
path string // path to pass to makeRequest to retry
30+
logURL *url.URL // a string to use in error messages
31+
body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close.
32+
lastRetryOffset int64
33+
offset int64 // Current offset within the blob
34+
firstConnectionTime time.Time
35+
lastSuccessTime time.Time // time.Time{} if N/A
36+
}
37+
38+
// newBodyReader creates a bodyReader for request path in c.
39+
// firstBody is an already correctly opened body for the blob, returing the full blob from the start.
40+
// If reading from firstBody fails, bodyReader may heuristically decide to resume.
41+
func newBodyReader(ctx context.Context, c *dockerClient, path string, firstBody io.ReadCloser) (io.ReadCloser, error) {
42+
logURL, err := c.resolveRequestURL(path)
43+
if err != nil {
44+
return nil, err
45+
}
46+
res := &bodyReader{
47+
ctx: ctx,
48+
c: c,
49+
50+
path: path,
51+
logURL: logURL,
52+
body: firstBody,
53+
lastRetryOffset: 0,
54+
offset: 0,
55+
firstConnectionTime: time.Now(),
56+
}
57+
return res, nil
58+
}
59+
60+
// parseDecimalInString ensures that s[start:] starts with a non-negative decimal number, and returns that number and the offset after the number.
61+
func parseDecimalInString(s string, start int) (int64, int, error) {
62+
i := start
63+
for i < len(s) && s[i] >= '0' && s[i] <= '9' {
64+
i++
65+
}
66+
if i == start {
67+
return -1, -1, errors.New("missing decimal number")
68+
}
69+
v, err := strconv.ParseInt(s[start:i], 10, 64)
70+
if err != nil {
71+
return -1, -1, fmt.Errorf("parsing number: %w", err)
72+
}
73+
return v, i, nil
74+
}
75+
76+
// parseExpectedChar ensures that s[pos] is the expected byte, and returns the offset after it.
77+
func parseExpectedChar(s string, pos int, expected byte) (int, error) {
78+
if pos == len(s) || s[pos] != expected {
79+
return -1, fmt.Errorf("missing expected %q", expected)
80+
}
81+
return pos + 1, nil
82+
}
83+
84+
// parseContentRange ensures that res contains a Content-Range header with a byte range, and returns (first, last, completeLength) on success. Size can be -1.
85+
func parseContentRange(res *http.Response) (int64, int64, int64, error) {
86+
hdrs := res.Header.Values("Content-Range")
87+
switch len(hdrs) {
88+
case 0:
89+
return -1, -1, -1, errors.New("missing Content-Range: header")
90+
case 1:
91+
break
92+
default:
93+
return -1, -1, -1, fmt.Errorf("ambiguous Content-Range:, %d header values", len(hdrs))
94+
}
95+
hdr := hdrs[0]
96+
expectedPrefix := "bytes "
97+
if !strings.HasPrefix(hdr, expectedPrefix) {
98+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q, missing prefix %q", hdr, expectedPrefix)
99+
}
100+
first, pos, err := parseDecimalInString(hdr, len(expectedPrefix))
101+
if err != nil {
102+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q, parsing first-pos: %w", hdr, err)
103+
}
104+
pos, err = parseExpectedChar(hdr, pos, '-')
105+
if err != nil {
106+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q: %w", hdr, err)
107+
}
108+
last, pos, err := parseDecimalInString(hdr, pos)
109+
if err != nil {
110+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q, parsing last-pos: %w", hdr, err)
111+
}
112+
pos, err = parseExpectedChar(hdr, pos, '/')
113+
if err != nil {
114+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q: %w", hdr, err)
115+
}
116+
completeLength := int64(-1)
117+
if pos < len(hdr) && hdr[pos] == '*' {
118+
pos++
119+
} else {
120+
completeLength, pos, err = parseDecimalInString(hdr, pos)
121+
if err != nil {
122+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q, parsing complete-length: %w", hdr, err)
123+
}
124+
}
125+
if pos < len(hdr) {
126+
return -1, -1, -1, fmt.Errorf("invalid Content-Range: %q, unexpected trailing content", hdr)
127+
}
128+
return first, last, completeLength, nil
129+
}
130+
131+
// Read implements io.ReadCloser
132+
func (br *bodyReader) Read(p []byte) (int, error) {
133+
if br.body == nil {
134+
return 0, fmt.Errorf("internal error: bodyReader.Read called on a closed object for %s", br.logURL.Redacted())
135+
}
136+
n, err := br.body.Read(p)
137+
br.offset += int64(n)
138+
switch {
139+
case err == nil || err == io.EOF:
140+
br.lastSuccessTime = time.Now()
141+
return n, err // Unlike the default: case, don’t log anything.
142+
143+
case errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET):
144+
originalErr := err
145+
redactedURL := br.logURL.Redacted()
146+
if err := br.errorIfNotReconnecting(originalErr, redactedURL); err != nil {
147+
return n, err
148+
}
149+
150+
if err := br.body.Close(); err != nil {
151+
logrus.Debugf("Error closing blob body: %v", err) // … and ignore err otherwise
152+
}
153+
br.body = nil
154+
time.Sleep(1*time.Second + time.Duration(rand.Intn(100_000))*time.Microsecond) // Some jitter so that a failure blip doesn’t cause a deterministic stampede
155+
156+
headers := map[string][]string{
157+
"Range": {fmt.Sprintf("bytes=%d-", br.offset)},
158+
}
159+
res, err := br.c.makeRequest(br.ctx, http.MethodGet, br.path, headers, nil, v2Auth, nil)
160+
if err != nil {
161+
return n, fmt.Errorf("%w (while reconnecting: %v)", originalErr, err)
162+
}
163+
consumedBody := false
164+
defer func() {
165+
if !consumedBody {
166+
res.Body.Close()
167+
}
168+
}()
169+
switch res.StatusCode {
170+
case http.StatusPartialContent: // OK
171+
// A client MUST inspect a 206 response's Content-Type and Content-Range field(s) to determine what parts are enclosed and whether additional requests are needed.
172+
// The recipient of an invalid Content-Range MUST NOT attempt to recombine the received content with a stored representation.
173+
first, last, completeLength, err := parseContentRange(res)
174+
if err != nil {
175+
return n, fmt.Errorf("%w (after reconnecting, invalid Content-Range header: %v)", originalErr, err)
176+
}
177+
// We don’t handle responses that start at an unrequested offset, nor responses that terminate before the end of the full blob.
178+
if first != br.offset || (completeLength != -1 && last+1 != completeLength) {
179+
return n, fmt.Errorf("%w (after reconnecting at offset %d, got unexpected Content-Range %d-%d/%d)", originalErr, br.offset, first, last, completeLength)
180+
}
181+
// Continue below
182+
case http.StatusOK:
183+
return n, fmt.Errorf("%w (after reconnecting, server did not process a Range: header, status %d)", originalErr, http.StatusOK)
184+
default:
185+
err := registryHTTPResponseToError(res)
186+
return n, fmt.Errorf("%w (after reconnecting, fetching blob: %v)", originalErr, err)
187+
}
188+
189+
logrus.Debugf("Succesfully reconnected to %s", redactedURL)
190+
consumedBody = true
191+
br.body = res.Body
192+
br.lastRetryOffset = br.offset
193+
return n, nil
194+
195+
default:
196+
logrus.Debugf("Error reading blob body from %s: %#v", br.logURL.Redacted(), err)
197+
return n, err
198+
}
199+
}
200+
201+
// millisecondsSince is like time.Since(tm).Milliseconds, but it returns a floating-point value
202+
func millisecondsSince(tm time.Time) float64 {
203+
return float64(time.Since(tm).Nanoseconds()) / 1_000_000.0
204+
}
205+
206+
// errorIfNotReconnecting makes a heuristic decision whether we should reconnect after err at redactedURL; if so, it returns nil,
207+
// otherwise it returns an appropriate error to return to the caller (possibly augmented with data about the heuristic)
208+
func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error {
209+
totalTime := millisecondsSince(br.firstConnectionTime)
210+
failureTime := math.NaN()
211+
if (br.lastSuccessTime != time.Time{}) {
212+
failureTime = millisecondsSince(br.lastSuccessTime)
213+
}
214+
logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress",
215+
redactedURL, originalErr, br.lastRetryOffset, br.offset, totalTime, failureTime)
216+
progress := br.offset - br.lastRetryOffset
217+
if progress < bodyReaderMinimumProgress {
218+
logrus.Debugf("Not reconnecting to %s because only %d bytes progress made", redactedURL, progress)
219+
return fmt.Errorf("(heuristic tuning data: last retry %d, current offset %d; %.3f ms total, %.3f ms since progress): %w",
220+
br.lastRetryOffset, br.offset, totalTime, failureTime, originalErr)
221+
}
222+
logrus.Infof("Reading blob body from %s failed (%v), reconnecting…", redactedURL, originalErr)
223+
return nil
224+
}
225+
226+
// Close implements io.ReadCloser
227+
func (br *bodyReader) Close() error {
228+
if br.body == nil {
229+
return nil
230+
}
231+
err := br.body.Close()
232+
br.body = nil
233+
return err
234+
}

docker/body_reader_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package docker
2+
3+
import (
4+
"net/http"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestParseDecimalInString(t *testing.T) {
12+
for _, prefix := range []string{"", "text", "0"} {
13+
for _, suffix := range []string{"", "text"} {
14+
for _, c := range []struct {
15+
s string
16+
v int64
17+
}{
18+
{"0", 0},
19+
{"1", 1},
20+
{"0700", 700}, // not octal
21+
} {
22+
input := prefix + c.s + suffix
23+
res, pos, err := parseDecimalInString(input, len(prefix))
24+
require.NoError(t, err, input)
25+
assert.Equal(t, c.v, res, input)
26+
assert.Equal(t, len(prefix)+len(c.s), pos, input)
27+
}
28+
for _, c := range []string{
29+
"-1",
30+
"xA",
31+
"&",
32+
"",
33+
"999999999999999999999999999999999999999999999999999999999999999999",
34+
} {
35+
input := prefix + c + suffix
36+
_, _, err := parseDecimalInString(input, len(prefix))
37+
assert.Error(t, err, c)
38+
}
39+
}
40+
}
41+
}
42+
43+
func TestParseExpectedChar(t *testing.T) {
44+
for _, prefix := range []string{"", "text", "0"} {
45+
for _, suffix := range []string{"", "text"} {
46+
input := prefix + "+" + suffix
47+
pos, err := parseExpectedChar(input, len(prefix), '+')
48+
require.NoError(t, err, input)
49+
assert.Equal(t, len(prefix)+1, pos, input)
50+
51+
_, err = parseExpectedChar(input, len(prefix), '-')
52+
assert.Error(t, err, input)
53+
}
54+
}
55+
}
56+
57+
func TestParseContentRange(t *testing.T) {
58+
for _, c := range []struct {
59+
in string
60+
first, last, completeLength int64
61+
}{
62+
{"bytes 0-0/1", 0, 0, 1},
63+
{"bytes 010-020/030", 10, 20, 30},
64+
{"bytes 1000-1010/*", 1000, 1010, -1},
65+
} {
66+
first, last, completeLength, err := parseContentRange(&http.Response{
67+
Header: http.Header{
68+
http.CanonicalHeaderKey("Content-Range"): []string{c.in},
69+
},
70+
})
71+
require.NoError(t, err, c.in)
72+
assert.Equal(t, c.first, first, c.in)
73+
assert.Equal(t, c.last, last, c.in)
74+
assert.Equal(t, c.completeLength, completeLength, c.in)
75+
}
76+
77+
for _, hdr := range []http.Header{
78+
nil,
79+
{http.CanonicalHeaderKey("Content-Range"): []string{}},
80+
{http.CanonicalHeaderKey("Content-Range"): []string{"bytes 1-2/3", "bytes 1-2/3"}},
81+
} {
82+
_, _, _, err := parseContentRange(&http.Response{
83+
Header: hdr,
84+
})
85+
assert.Error(t, err)
86+
}
87+
88+
for _, c := range []string{
89+
"",
90+
"notbytes 1-2/3",
91+
"bytes ",
92+
"bytes x-2/3",
93+
"bytes 1*2/3",
94+
"bytes 1",
95+
"bytes 1-",
96+
"bytes 1-x/3",
97+
"bytes 1-2",
98+
"bytes 1-2@3",
99+
"bytes 1-2/",
100+
"bytes 1-2/*a",
101+
"bytes 1-2/3a",
102+
} {
103+
_, _, _, err := parseContentRange(&http.Response{
104+
Header: http.Header{
105+
http.CanonicalHeaderKey("Content-Range"): []string{c},
106+
},
107+
})
108+
assert.Error(t, err, c, c)
109+
}
110+
}

docker/docker_client.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -472,14 +472,24 @@ func (c *dockerClient) makeRequest(ctx context.Context, method, path string, hea
472472
return nil, err
473473
}
474474

475-
urlString := fmt.Sprintf("%s://%s%s", c.scheme, c.registry, path)
476-
requestURL, err := url.Parse(urlString)
475+
requestURL, err := c.resolveRequestURL(path)
477476
if err != nil {
478477
return nil, err
479478
}
480479
return c.makeRequestToResolvedURL(ctx, method, requestURL, headers, stream, -1, auth, extraScope)
481480
}
482481

482+
// resolveRequestURL turns a path for c.makeRequest into a full URL.
483+
// Most users should call makeRequest directly, this exists basically to make the URL available for debug logs.
484+
func (c *dockerClient) resolveRequestURL(path string) (*url.URL, error) {
485+
urlString := fmt.Sprintf("%s://%s%s", c.scheme, c.registry, path)
486+
res, err := url.Parse(urlString)
487+
if err != nil {
488+
return nil, err
489+
}
490+
return res, nil
491+
}
492+
483493
// Checks if the auth headers in the response contain an indication of a failed
484494
// authorizdation because of an "insufficient_scope" error. If that's the case,
485495
// returns the required scope to be used for fetching a new token.
@@ -965,7 +975,14 @@ func (c *dockerClient) getBlob(ctx context.Context, ref dockerReference, info ty
965975
return nil, 0, fmt.Errorf("fetching blob: %w", err)
966976
}
967977
cache.RecordKnownLocation(ref.Transport(), bicTransportScope(ref), info.Digest, newBICLocationReference(ref))
968-
return res.Body, getBlobSize(res), nil
978+
blobSize := getBlobSize(res)
979+
980+
reconnectingReader, err := newBodyReader(ctx, c, path, res.Body)
981+
if err != nil {
982+
res.Body.Close()
983+
return nil, 0, err
984+
}
985+
return reconnectingReader, blobSize, nil
969986
}
970987

971988
// getOCIDescriptorContents returns the contents a blob spcified by descriptor in ref, which must fit within limit.

version/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const (
88
// VersionMinor is for functionality in a backwards-compatible manner
99
VersionMinor = 24
1010
// VersionPatch is for backwards-compatible bug fixes
11-
VersionPatch = 1
11+
VersionPatch = 2
1212

1313
// VersionDev indicates development branch. Releases will be empty string.
1414
VersionDev = "-dev"

0 commit comments

Comments
 (0)