diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriterChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java similarity index 95% rename from gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriterChannelImpl.java rename to gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 1eefe87702e6..48d60dc8e24d 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriterChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -32,7 +32,7 @@ /** * Default implementation for BlobWriteChannel. */ -class BlobWriterChannelImpl implements BlobWriteChannel { +class BlobWriteChannelImpl implements BlobWriteChannel { private static final long serialVersionUID = 8675286882724938737L; private static final int MIN_CHUNK_SIZE = 256 * 1024; @@ -50,12 +50,12 @@ class BlobWriterChannelImpl implements BlobWriteChannel { private transient StorageRpc storageRpc; private transient StorageObject storageObject; - BlobWriterChannelImpl(StorageOptions options, BlobInfo blobInfo, + BlobWriteChannelImpl(StorageOptions options, BlobInfo blobInfo, Map optionsMap) { this.options = options; this.blobInfo = blobInfo; initTransients(); - uploadId = options.storageRpc().open(storageObject, optionsMap); + uploadId = storageRpc.open(storageObject, optionsMap); } private void writeObject(ObjectOutputStream out) throws IOException { diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/StorageImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/StorageImpl.java index 4c80c6741559..5f2ee7e30d71 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/StorageImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/StorageImpl.java @@ -443,7 +443,7 @@ public BlobReadChannel reader(String bucket, String blob, BlobSourceOption... op @Override public BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) { final Map optionsMap = optionMap(blobInfo, options); - return new BlobWriterChannelImpl(options(), blobInfo, optionsMap); + return new BlobWriteChannelImpl(options(), blobInfo, optionsMap); } @Override diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java new file mode 100644 index 000000000000..2a43753e6178 --- /dev/null +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java @@ -0,0 +1,188 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.gcloud.storage; + +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableMap; +import com.google.gcloud.RetryParams; +import com.google.gcloud.spi.StorageRpc; + +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.Before; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; +import java.util.Random; +import org.junit.After; + +public class BlobReadChannelImplTest { + + private static final String BUCKET_NAME = "b"; + private static final String BLOB_NAME = "n"; + private static final BlobInfo BLOB_INFO = BlobInfo.of(BUCKET_NAME, BLOB_NAME); + private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); + private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024; + private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024; + private static final Random RANDOM = new Random(); + + private StorageOptions optionsMock; + private StorageRpc storageRpcMock; + private BlobReadChannelImpl reader; + + @Before + public void setUp() throws IOException, InterruptedException { + optionsMock = EasyMock.createMock(StorageOptions.class); + storageRpcMock = EasyMock.createMock(StorageRpc.class); + } + + @After + public void tearDown() throws Exception { + verify(optionsMock); + verify(storageRpcMock); + } + + @Test + public void testCreate() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.replay(optionsMock); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertTrue(reader.isOpen()); + } + + @Test + public void testReadBuffered() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE); + ByteBuffer firstReadBuffer = ByteBuffer.allocate(42); + ByteBuffer secondReadBuffer = ByteBuffer.allocate(42); + EasyMock + .expect(storageRpcMock.read(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(result); + EasyMock.replay(storageRpcMock); + reader.read(firstReadBuffer); + reader.read(secondReadBuffer); + assertArrayEquals(Arrays.copyOf(result, firstReadBuffer.capacity()), firstReadBuffer.array()); + assertArrayEquals( + Arrays.copyOfRange(result, firstReadBuffer.capacity(), firstReadBuffer.capacity() + + secondReadBuffer.capacity()), + secondReadBuffer.array()); + } + + @Test + public void testReadBig() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()).times(2); + EasyMock.replay(optionsMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + reader.chunkSize(CUSTOM_CHUNK_SIZE); + byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE); + byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE); + ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + ByteBuffer secondReadBuffer = ByteBuffer.allocate(42); + EasyMock + .expect(storageRpcMock.read(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(firstResult); + EasyMock + .expect( + storageRpcMock.read(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, + CUSTOM_CHUNK_SIZE)) + .andReturn(secondResult); + EasyMock.replay(storageRpcMock); + reader.read(firstReadBuffer); + reader.read(secondReadBuffer); + assertArrayEquals(firstResult, firstReadBuffer.array()); + assertArrayEquals(Arrays.copyOf(secondResult, secondReadBuffer.capacity()), + secondReadBuffer.array()); + } + + @Test + public void testReadFinish() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + byte[] result = {}; + ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + EasyMock + .expect(storageRpcMock.read(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(result); + EasyMock.replay(storageRpcMock); + assertEquals(-1, reader.read(readBuffer)); + } + + @Test + public void testSeek() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + reader.seek(42); + byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE); + ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + EasyMock + .expect(storageRpcMock.read(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE)) + .andReturn(result); + EasyMock.replay(storageRpcMock); + reader.read(readBuffer); + assertArrayEquals(result, readBuffer.array()); + } + + @Test + public void testClose() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.replay(optionsMock); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertTrue(reader.isOpen()); + reader.close(); + assertTrue(!reader.isOpen()); + } + + @Test + public void testReadClosed() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.replay(optionsMock); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + reader.close(); + try { + ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + reader.read(readBuffer); + fail("Expected BlobReadChannel read to throw IOException"); + } catch (IOException ex) { + // expected + } + } + + private static byte[] randomByteArray(int size) { + byte[] byteArray = new byte[size]; + RANDOM.nextBytes(byteArray); + return byteArray; + } +} diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java new file mode 100644 index 000000000000..cc43f8f6e1f7 --- /dev/null +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java @@ -0,0 +1,200 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.gcloud.storage; + +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableMap; +import com.google.gcloud.RetryParams; +import com.google.gcloud.spi.StorageRpc; + +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.Before; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; +import java.util.Random; +import org.junit.After; + +public class BlobWriteChannelImplTest { + + private static final String BUCKET_NAME = "b"; + private static final String BLOB_NAME = "n"; + private static final String UPLOAD_ID = "uploadid"; + private static final BlobInfo BLOB_INFO = BlobInfo.of(BUCKET_NAME, BLOB_NAME); + private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); + private static final int MIN_CHUNK_SIZE = 256 * 1024; + private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE; + private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE; + private static final Random RANDOM = new Random(); + + private StorageOptions optionsMock; + private StorageRpc storageRpcMock; + private BlobWriteChannelImpl writer; + + @Before + public void setUp() throws IOException, InterruptedException { + optionsMock = EasyMock.createMock(StorageOptions.class); + storageRpcMock = EasyMock.createMock(StorageRpc.class); + } + + @After + public void tearDown() throws Exception { + verify(optionsMock); + verify(storageRpcMock); + } + + @Test + public void testCreate() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertTrue(writer.isOpen()); + } + + @Test + public void testWriteWithoutFlush() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE))); + } + + @Test + public void testWriteWithFlush() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(CUSTOM_CHUNK_SIZE), + EasyMock.eq(false)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.chunkSize(CUSTOM_CHUNK_SIZE); + ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE); + assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer)); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + } + + @Test + public void testWritesAndFlush() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(DEFAULT_CHUNK_SIZE), + EasyMock.eq(false)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE]; + for (int i = 0; i < buffers.length; i++) { + buffers[i] = randomBuffer(MIN_CHUNK_SIZE); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffers[i])); + } + for (int i = 0; i < buffers.length; i++) { + assertArrayEquals( + buffers[i].array(), + Arrays.copyOfRange( + capturedBuffer.getValue(), MIN_CHUNK_SIZE * i, MIN_CHUNK_SIZE * (i + 1))); + } + } + + @Test + public void testCloseWithoutFlush() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(0), EasyMock.eq(true)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertTrue(writer.isOpen()); + writer.close(); + assertArrayEquals(new byte[0], capturedBuffer.getValue()); + assertTrue(!writer.isOpen()); + } + + @Test + public void testCloseWithFlush() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(MIN_CHUNK_SIZE), + EasyMock.eq(true)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertTrue(writer.isOpen()); + writer.write(buffer); + writer.close(); + assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length); + assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE)); + assertTrue(!writer.isOpen()); + } + + @Test + public void testWriteClosed() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(0), EasyMock.eq(true)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.close(); + try { + writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)); + fail("Expected BlobWriteChannel write to throw IOException"); + } catch (IOException ex) { + // expected + } + } + + private static ByteBuffer randomBuffer(int size) { + byte[] byteArray = new byte[size]; + RANDOM.nextBytes(byteArray); + return ByteBuffer.wrap(byteArray); + } +}