Skip to content

Commit d4bfcf0

Browse files
authored
fix: fix JSON read handling when socket broken resulting in partial bytes copied (#2303)
Add integration test with testbench to force failure during read Update retry conformance tests to assert full byte content when read through reader Fixes #2301
1 parent c069aca commit d4bfcf0

File tree

4 files changed

+91
-9
lines changed

4 files changed

+91
-9
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.math.BigInteger;
4848
import java.nio.ByteBuffer;
4949
import java.nio.channels.Channels;
50+
import java.nio.channels.ClosedChannelException;
5051
import java.nio.channels.ReadableByteChannel;
5152
import java.nio.channels.ScatteringByteChannel;
5253
import java.util.List;
@@ -68,6 +69,7 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
6869
private long position;
6970
private ScatteringByteChannel sbc;
7071
private boolean open;
72+
private boolean returnEOF;
7173

7274
// returned X-Goog-Generation header value
7375
private Long xGoogGeneration;
@@ -84,28 +86,37 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
8486
this.options = options;
8587
this.resultRetryAlgorithm = resultRetryAlgorithm;
8688
this.open = true;
89+
this.returnEOF = false;
8790
this.position = apiaryReadRequest.getByteRangeSpec().beginOffset();
8891
}
8992

9093
@Override
9194
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
95+
if (returnEOF) {
96+
close();
97+
return -1;
98+
} else if (!open) {
99+
throw new ClosedChannelException();
100+
}
101+
long totalRead = 0;
92102
do {
93103
if (sbc == null) {
94104
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
95105
}
96106

107+
long totalRemaining = Buffers.totalRemaining(dsts, offset, length);
97108
try {
98109
// According to the contract of Retrying#run it's possible for sbc to be null even after
99110
// invocation. However, the function we provide is guaranteed to return non-null or throw
100111
// an exception. So we suppress the warning from intellij here.
101112
//noinspection ConstantConditions
102113
long read = sbc.read(dsts, offset, length);
103114
if (read == -1) {
104-
open = false;
115+
returnEOF = true;
105116
} else {
106-
position += read;
117+
totalRead += read;
107118
}
108-
return read;
119+
return totalRead;
109120
} catch (Exception t) {
110121
if (resultRetryAlgorithm.shouldRetry(t, null)) {
111122
// if our retry algorithm COULD allow a retry, continue the loop and allow trying to
@@ -121,6 +132,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
121132
} else {
122133
throw new IOException(StorageException.coalesce(t));
123134
}
135+
} finally {
136+
long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length);
137+
long delta = totalRemaining - totalRemainingAfter;
138+
if (delta > 0) {
139+
position += delta;
140+
totalRead += delta;
141+
}
124142
}
125143
} while (true);
126144
}

google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.Buffer;
2121
import java.nio.ByteBuffer;
2222
import java.nio.channels.ReadableByteChannel;
23+
import java.util.Arrays;
2324
import java.util.function.Consumer;
2425

2526
/**
@@ -159,4 +160,9 @@ static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
159160
}
160161
return total;
161162
}
163+
164+
static long totalRemaining(ByteBuffer[] buffers, int offset, int length) {
165+
ByteBuffer[] sub = Arrays.copyOfRange(buffers, offset, length);
166+
return Arrays.stream(sub).mapToLong(Buffer::remaining).sum();
167+
}
162168
}

google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.storage.conformance.retry;
1818

19+
import static com.google.cloud.storage.TestUtils.xxd;
1920
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.defaultSetup;
2021
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.notificationSetup;
2122
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.pubsubTopicSetup;
@@ -1277,9 +1278,12 @@ private static void get(ArrayList<RpcMethodMapping> a) {
12771278
try {
12781279
ReadChannel reader =
12791280
ctx.getStorage().reader(ctx.getState().getBlob().getBlobId());
1280-
WritableByteChannel write =
1281-
Channels.newChannel(ByteStreams.nullOutputStream());
1281+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1282+
WritableByteChannel write = Channels.newChannel(baos);
12821283
ByteStreams.copy(reader, write);
1284+
1285+
assertThat(xxd(baos.toByteArray()))
1286+
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
12831287
} catch (IOException e) {
12841288
if (e.getCause() instanceof BaseServiceException) {
12851289
throw e.getCause();
@@ -1299,9 +1303,12 @@ private static void get(ArrayList<RpcMethodMapping> a) {
12991303
.reader(
13001304
ctx.getState().getBlob().getBlobId().getBucket(),
13011305
ctx.getState().getBlob().getBlobId().getName());
1302-
WritableByteChannel write =
1303-
Channels.newChannel(ByteStreams.nullOutputStream());
1306+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1307+
WritableByteChannel write = Channels.newChannel(baos);
13041308
ByteStreams.copy(reader, write);
1309+
1310+
assertThat(xxd(baos.toByteArray()))
1311+
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
13051312
} catch (IOException e) {
13061313
if (e.getCause() instanceof BaseServiceException) {
13071314
throw e.getCause();

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelV2RetryTest.java

+53-2
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,6 @@ public void restartingAStreamForGzipContentIsAtTheCorrectOffset() throws Excepti
160160
.setContentEncoding("gzip")
161161
.build();
162162
Blob gen1 = storage.create(info, gzipped.getBytes(), BlobTargetOption.doesNotExist());
163-
String uri = gen1.getBlobId().toGsUtilUri();
164-
System.out.println("uri = " + uri);
165163

166164
JsonObject instructions = new JsonObject();
167165
JsonArray value = new JsonArray();
@@ -200,4 +198,57 @@ public void restartingAStreamForGzipContentIsAtTheCorrectOffset() throws Excepti
200198
.isEqualTo(ImmutableList.of(String.format("bytes=%d-", 256 * 1024))));
201199
}
202200
}
201+
202+
@Test
203+
public void resumeFromCorrectOffsetWhenPartialReadSuccess() throws Exception {
204+
StorageOptions baseOptions = storage.getOptions();
205+
Random rand = new Random(918273645);
206+
207+
ChecksummedTestContent uncompressed;
208+
{
209+
// must use random strategy, base64 characters compress too well. 512KiB uncompressed becomes
210+
// ~1600 bytes which is smaller than our 'return-broken-stream-after-256K' rule
211+
byte[] bytes = DataGenerator.rand(rand).genBytes(_512KiB);
212+
// byte[] bytes = DataGenerator.base64Characters().genBytes(_512KiB);
213+
uncompressed = ChecksummedTestContent.of(bytes);
214+
}
215+
BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName());
216+
BlobInfo info = BlobInfo.newBuilder(id).build();
217+
Blob gen1 = storage.create(info, uncompressed.getBytes(), BlobTargetOption.doesNotExist());
218+
219+
JsonObject instructions = new JsonObject();
220+
JsonArray value = new JsonArray();
221+
value.add("return-broken-stream-after-256K");
222+
instructions.add("storage.objects.get", value);
223+
RetryTestResource retryTestResource = new RetryTestResource(instructions);
224+
RetryTestResource retryTest = testBench.createRetryTest(retryTestResource);
225+
226+
ImmutableMap<String, String> headers = ImmutableMap.of("x-retry-test-id", retryTest.id);
227+
228+
RequestAuditing requestAuditing = new RequestAuditing();
229+
StorageOptions testStorageOptions =
230+
baseOptions
231+
.toBuilder()
232+
.setTransportOptions(requestAuditing)
233+
.setHeaderProvider(FixedHeaderProvider.create(headers))
234+
.build();
235+
236+
String expected = xxd(uncompressed.getBytes());
237+
238+
try (Storage testStorage = testStorageOptions.getService();
239+
ReadChannel r = testStorage.reader(gen1.getBlobId());
240+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
241+
WritableByteChannel w = Channels.newChannel(baos)) {
242+
long copy = ByteStreams.copy(r, w);
243+
String actual = xxd(baos.toByteArray());
244+
ImmutableList<HttpRequest> requests = requestAuditing.getRequests();
245+
assertAll(
246+
() -> assertThat(copy).isEqualTo(uncompressed.getBytes().length),
247+
() -> assertThat(actual).isEqualTo(expected),
248+
() -> assertThat(requests.get(0).getHeaders().get("range")).isNull(),
249+
() ->
250+
assertThat(requests.get(1).getHeaders().get("range"))
251+
.isEqualTo(ImmutableList.of(String.format("bytes=%d-", 256 * 1024))));
252+
}
253+
}
203254
}

0 commit comments

Comments
 (0)