Skip to content

Commit b44962e

Browse files
azblob: Return io.ErrUnexpectedEOF as error in UploadStream (#22109)
1 parent 423e02b commit b44962e

File tree

4 files changed

+155
-2
lines changed

4 files changed

+155
-2
lines changed

sdk/storage/azblob/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Fixed an issue that would cause metadata keys with empty values to be omitted when enumerating blobs.
1616
* Fixed an issue where passing empty map to set blob tags API was causing panic. Fixes [#21869](https://github.com/Azure/azure-sdk-for-go/issues/21869).
1717
* Fixed an issue where downloaded file has incorrect size when not a multiple of block size. Fixes [#21995](https://github.com/Azure/azure-sdk-for-go/issues/21995).
18+
* Fixed case where `io.ErrUnexpectedEOF` was treated as expected error in `UploadStream`. Fixes [#21837](https://github.com/Azure/azure-sdk-for-go/issues/21837).
1819

1920
### Other Changes
2021

sdk/storage/azblob/blockblob/chunkwriting.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit
7575
}
7676

7777
var n int
78-
n, err = io.ReadFull(src, buffer)
78+
n, err = shared.ReadAtLeast(src, buffer, len(buffer))
7979

8080
if n > 0 {
8181
// some data was read, upload it
@@ -108,7 +108,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit
108108
}
109109

110110
if err != nil { // The reader is done, no more outgoing buffers
111-
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
111+
if errors.Is(err, io.EOF) {
112112
// these are expected errors, we don't surface those
113113
err = nil
114114
} else {

sdk/storage/azblob/blockblob/client_test.go

+128
Original file line numberDiff line numberDiff line change
@@ -5033,6 +5033,134 @@ func (s *BlockBlobUnrecordedTestsSuite) TestUploadStreamToBlobProperties() {
50335033
_require.EqualValues(actualBlobData, blobData)
50345034
}
50355035

5036+
func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadDownloadStream() {
5037+
_require := require.New(s.T())
5038+
testName := s.T().Name()
5039+
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
5040+
_require.NoError(err)
5041+
5042+
blobSize := 11 * 1024 * 1024
5043+
bufferSize := 2 * 1024 * 1024
5044+
maxBuffers := 2
5045+
5046+
containerName := testcommon.GenerateContainerName(testName)
5047+
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
5048+
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)
5049+
5050+
// Set up test blob
5051+
blobName := testcommon.GenerateBlobName(testName)
5052+
bbClient := testcommon.GetBlockBlobClient(blobName, containerClient)
5053+
blobContentReader, blobData := testcommon.GenerateData(blobSize)
5054+
5055+
_, err = bbClient.UploadStream(context.Background(), blobContentReader, &blockblob.UploadStreamOptions{
5056+
BlockSize: int64(bufferSize),
5057+
Concurrency: maxBuffers,
5058+
Metadata: testcommon.BasicMetadata,
5059+
Tags: testcommon.BasicBlobTagsMap,
5060+
HTTPHeaders: &testcommon.BasicHeaders,
5061+
})
5062+
_require.NoError(err)
5063+
5064+
downloadResponse, err := bbClient.DownloadStream(context.Background(), nil)
5065+
_require.NoError(err)
5066+
5067+
bbClient2 := testcommon.GetBlockBlobClient("blobName2", containerClient)
5068+
5069+
// UploadStream using http.Response.Body as the reader
5070+
_, err = bbClient2.UploadStream(context.Background(), downloadResponse.Body, &blockblob.UploadStreamOptions{
5071+
BlockSize: int64(bufferSize),
5072+
Concurrency: maxBuffers,
5073+
})
5074+
_require.NoError(err)
5075+
5076+
downloadResp2, err := bbClient2.DownloadStream(context.Background(), nil)
5077+
_require.NoError(err)
5078+
5079+
// Assert that the content is correct
5080+
actualBlobData, err := io.ReadAll(downloadResp2.Body)
5081+
_require.NoError(err)
5082+
_require.Equal(len(actualBlobData), len(blobData))
5083+
_require.EqualValues(actualBlobData, blobData)
5084+
}
5085+
5086+
// This test simulates UploadStream and DownloadBuffer methods,
5087+
// and verifies length and content of file
5088+
func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamDownloadBuffer() {
5089+
_require := require.New(s.T())
5090+
testName := s.T().Name()
5091+
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
5092+
_require.NoError(err)
5093+
5094+
containerName := testcommon.GenerateContainerName(testName)
5095+
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
5096+
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)
5097+
5098+
const MiB = 1024 * 1024
5099+
testUploadDownload := func(contentSize int) {
5100+
content := make([]byte, contentSize)
5101+
_, _ = rand.Read(content)
5102+
contentMD5 := md5.Sum(content)
5103+
body := streaming.NopCloser(bytes.NewReader(content))
5104+
5105+
srcBlob := containerClient.NewBlockBlobClient("srcblob")
5106+
5107+
// Prepare source bbClient for copy.
5108+
_, err = srcBlob.UploadStream(context.Background(), body, &blockblob.UploadStreamOptions{
5109+
BlockSize: 4 * MiB,
5110+
Concurrency: 5,
5111+
})
5112+
_require.NoError(err)
5113+
5114+
// Download to a buffer and verify contents
5115+
buff := make([]byte, contentSize)
5116+
b := blob.DownloadBufferOptions{
5117+
BlockSize: 5 * MiB,
5118+
Concurrency: 4,
5119+
}
5120+
n, err := srcBlob.DownloadBuffer(context.Background(), buff, &b)
5121+
_require.NoError(err)
5122+
_require.Equal(int64(contentSize), n)
5123+
_require.Equal(contentMD5, md5.Sum(buff[:]))
5124+
}
5125+
5126+
testUploadDownload(0) // zero byte blob
5127+
testUploadDownload(5 * MiB)
5128+
testUploadDownload(20 * MiB)
5129+
testUploadDownload(199 * MiB)
5130+
}
5131+
5132+
type fakeReader struct {
5133+
cnt int
5134+
}
5135+
5136+
func (a *fakeReader) Read(bytes []byte) (count int, err error) {
5137+
if a.cnt < 5 {
5138+
_, buf := testcommon.GenerateData(1024)
5139+
n := copy(bytes, buf)
5140+
a.cnt++
5141+
return n, nil
5142+
}
5143+
return 0, io.ErrUnexpectedEOF
5144+
}
5145+
5146+
func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamUsingCustomReader() {
5147+
_require := require.New(s.T())
5148+
testName := s.T().Name()
5149+
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
5150+
_require.NoError(err)
5151+
5152+
containerName := testcommon.GenerateContainerName(testName)
5153+
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
5154+
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)
5155+
5156+
bbClient := testcommon.GetBlockBlobClient(testcommon.GenerateBlobName(testName), containerClient)
5157+
5158+
r := &fakeReader{}
5159+
_, err = bbClient.UploadStream(context.Background(), r, nil)
5160+
_require.Error(err)
5161+
_require.Equal(err, io.ErrUnexpectedEOF)
5162+
}
5163+
50365164
func (s *BlockBlobRecordedTestsSuite) TestBlockBlobSetTierOnVersions() {
50375165
_require := require.New(s.T())
50385166
testName := s.T().Name()

sdk/storage/azblob/internal/shared/shared.go

+24
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,27 @@ func IsIPEndpointStyle(host string) bool {
254254
}
255255
return net.ParseIP(host) != nil
256256
}
257+
258+
// ReadAtLeast reads from r into buf until it has read at least min bytes.
259+
// It returns the number of bytes copied and an error.
260+
// The EOF error is returned if no bytes were read or
261+
// EOF happened after reading fewer than min bytes.
262+
// If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer.
263+
// On return, n >= min if and only if err == nil.
264+
// If r returns an error having read at least min bytes, the error is dropped.
265+
// This method is same as io.ReadAtLeast except that it does not
266+
// return io.ErrUnexpectedEOF when fewer than min bytes are read.
267+
func ReadAtLeast(r io.Reader, buf []byte, min int) (n int, err error) {
268+
if len(buf) < min {
269+
return 0, io.ErrShortBuffer
270+
}
271+
for n < min && err == nil {
272+
var nn int
273+
nn, err = r.Read(buf[n:])
274+
n += nn
275+
}
276+
if n >= min {
277+
err = nil
278+
}
279+
return
280+
}

0 commit comments

Comments
 (0)