Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memleak in chunkedFileWriter #1695

Merged
merged 3 commits into from
Mar 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 64 additions & 18 deletions common/chunkedFileWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type chunkedFileWriter struct {

// used for completion
successMd5 chan []byte
failureError chan error
chunkWriterDone chan bool

// controls body-read retries. Public so value can be shared with retryReader
maxRetryPerDownloadBody int
Expand All @@ -93,6 +93,10 @@ type chunkedFileWriter struct {
md5ValidationOption HashValidationOption

sourceMd5Exists bool

currentReservedCapacity int64

err error //This field should be set only by workerRoutine
}

type fileChunk struct {
Expand All @@ -112,11 +116,12 @@ func NewChunkedFileWriter(ctx context.Context, slicePool ByteSlicePooler, cacheL
cacheLimiter: cacheLimiter,
chunkLogger: chunkLogger,
successMd5: make(chan []byte),
failureError: make(chan error, 1),
chunkWriterDone: make(chan bool, 1),
newUnorderedChunks: make(chan fileChunk, chanBufferSize),
maxRetryPerDownloadBody: maxBodyRetries,
md5ValidationOption: md5ValidationOption,
sourceMd5Exists: sourceMd5Exists,
currentReservedCapacity: 0,
}
go w.workerRoutine(ctx)
return w
Expand All @@ -137,14 +142,15 @@ func (w *chunkedFileWriter) WaitToScheduleChunk(ctx context.Context, id ChunkID,
w.chunkLogger.LogChunkStatus(id, EWaitReason.RAMToSchedule())
err := w.cacheLimiter.WaitUntilAdd(ctx, chunkSize, w.shouldUseRelaxedRamThreshold)
if err == nil {
atomic.AddInt64(&w.currentReservedCapacity, chunkSize)
atomic.AddInt32(&w.activeChunkCount, 1)
}
return err
//At this point, the book-keeping of this memory is chunkedFileWriter's responsibility
}

// Threadsafe method to enqueue a new chunk for processing
func (w *chunkedFileWriter) EnqueueChunk(ctx context.Context, id ChunkID, chunkSize int64, chunkContents io.Reader, retryable bool) error {

func (w *chunkedFileWriter) EnqueueChunk(ctx context.Context, id ChunkID, chunkSize int64, chunkContents io.Reader, retryable bool) (err error) {
readDone := make(chan struct{})
if retryable {
// if retryable == true, that tells us that closing the reader
Expand All @@ -158,8 +164,21 @@ func (w *chunkedFileWriter) EnqueueChunk(ctx context.Context, id ChunkID, chunkS

// read into a buffer
buffer := w.slicePool.RentSlice(chunkSize)

defer func() {
//cleanup stuff if we abruptly quit
if err == nil {
return //We've successfully queued, the worker will now takeover
}
w.cacheLimiter.Remove(chunkSize) // remove this from the tally of scheduled-but-unsaved bytes
atomic.AddInt64(&w.currentReservedCapacity, -chunkSize)
w.slicePool.ReturnSlice(buffer)
atomic.AddInt32(&w.activeChunkCount, -1)
w.chunkLogger.LogChunkStatus(id, EWaitReason.ChunkDone()) // this chunk is all finished
}()

readStart := time.Now()
_, err := io.ReadFull(chunkContents, buffer)
_, err = io.ReadFull(chunkContents, buffer)
close(readDone)
if err != nil {
return err
Expand All @@ -172,15 +191,14 @@ func (w *chunkedFileWriter) EnqueueChunk(ctx context.Context, id ChunkID, chunkS
// enqueue it
w.chunkLogger.LogChunkStatus(id, EWaitReason.Sorting())
select {
case err = <-w.failureError:
case <-w.chunkWriterDone:
err = w.err
if err != nil {
return err
}
return ChunkWriterAlreadyFailed // channel returned nil because it was closed and empty
case <-ctx.Done():
return ctx.Err()
case w.newUnorderedChunks <- fileChunk{id: id, data: buffer}:
return nil
return
}
}

Expand All @@ -189,15 +207,30 @@ func (w *chunkedFileWriter) Flush(ctx context.Context) ([]byte, error) {
// let worker know that no more will be coming
close(w.newUnorderedChunks)

/*
* We clear accounted but unused memory, i.e capacity, here. This capacity was
* requested from cacheLimiter when we were waiting to schedule this chunk.
* The below statement needs to happen after we've waited for all the chunks.
*
* Why should we do this?
* Ideally, the capacity should be zero here, because workerRoutine() would return
* the slice after saving the chunk. However, transferProcessor() is designed such that
* it has to schedule all chunks of jptm even if it has detected a failure in between.
* In such a case, we'd have added to the capacity of the fileWriter, while the
* workerRoutine() has already exited. We release that capacity here. When Flush() finds
* active chunks here, it is only those which have not rented a slice.
*/
defer func() {
w.cacheLimiter.Remove(atomic.LoadInt64(&w.currentReservedCapacity))
}()

// wait until all written to disk
select {
case err := <-w.failureError:
if err != nil {
return nil, err
case <-w.chunkWriterDone:
if w.err != nil {
return nil, w.err
}
return nil, ChunkWriterAlreadyFailed // channel returned nil because it was closed and empty
case <-ctx.Done():
return nil, ctx.Err()
case md5AtCompletion := <-w.successMd5:
return md5AtCompletion, nil
}
Expand All @@ -221,6 +254,19 @@ func (w *chunkedFileWriter) workerRoutine(ctx context.Context) {
md5Hasher = &nullHasher{}
}

defer func() {
//cleanup stuff if we abruptly quit
for _, chunk := range unsavedChunksByFileOffset {
w.cacheLimiter.Remove(int64(chunk.id.length)) // remove this from the tally of scheduled-but-unsaved bytes
atomic.AddInt64(&w.currentReservedCapacity, -chunk.id.length)
w.slicePool.ReturnSlice(chunk.data)
atomic.AddInt32(&w.activeChunkCount, -1)
w.chunkLogger.LogChunkStatus(chunk.id, EWaitReason.ChunkDone()) // this chunk is all finished
}
close(w.chunkWriterDone) // must close because many goroutines may be calling the public methods, and all need to be able to tell there's been an error, even tho only one will get the actual error
unsavedChunksByFileOffset = nil
}()

for {
var newChunk fileChunk
var channelIsOpen bool
Expand All @@ -236,7 +282,7 @@ func (w *chunkedFileWriter) workerRoutine(ctx context.Context) {
return
}
case <-ctx.Done(): // If cancelled out in the middle of enqueuing chunks OR processing chunks, they will both cleanly cancel out and we'll get back to here.
w.failureError <- ctx.Err()
w.err = ctx.Err()
return
}

Expand All @@ -248,8 +294,7 @@ func (w *chunkedFileWriter) workerRoutine(ctx context.Context) {
w.setStatusForContiguousAvailableChunks(unsavedChunksByFileOffset, nextOffsetToSave, ctx) // update states of those that have all their prior ones already here
err := w.sequentiallyProcessAvailableChunks(unsavedChunksByFileOffset, &nextOffsetToSave, md5Hasher, ctx)
if err != nil {
w.failureError <- err
close(w.failureError) // must close because many goroutines may be calling the public methods, and all need to be able to tell there's been an error, even tho only one will get the actual error
w.err = err
return // no point in processing any more after a failure
}
}
Expand Down Expand Up @@ -305,8 +350,9 @@ func (w *chunkedFileWriter) setStatusForContiguousAvailableChunks(unsavedChunksB
func (w *chunkedFileWriter) saveOneChunk(chunk fileChunk, md5Hasher hash.Hash) error {
defer func() {
w.cacheLimiter.Remove(int64(len(chunk.data))) // remove this from the tally of scheduled-but-unsaved bytes
atomic.AddInt32(&w.activeChunkCount, -1)
w.slicePool.ReturnSlice(chunk.data)
atomic.AddInt32(&w.activeChunkCount, -1)
atomic.AddInt64(&w.currentReservedCapacity, -chunk.id.length)
w.chunkLogger.LogChunkStatus(chunk.id, EWaitReason.ChunkDone()) // this chunk is all finished
}()

Expand Down