@@ -84,7 +84,7 @@ type chunkedFileWriter struct {
84
84
85
85
// used for completion
86
86
successMd5 chan []byte
87
- failureError chan error
87
+ chunkWriterDone chan bool
88
88
89
89
// controls body-read retries. Public so value can be shared with retryReader
90
90
maxRetryPerDownloadBody int
@@ -93,6 +93,10 @@ type chunkedFileWriter struct {
93
93
md5ValidationOption HashValidationOption
94
94
95
95
sourceMd5Exists bool
96
+
97
+ currentReservedCapacity int64
98
+
99
+ err error //This field should be set only by workerRoutine
96
100
}
97
101
98
102
type fileChunk struct {
@@ -112,11 +116,12 @@ func NewChunkedFileWriter(ctx context.Context, slicePool ByteSlicePooler, cacheL
112
116
cacheLimiter : cacheLimiter ,
113
117
chunkLogger : chunkLogger ,
114
118
successMd5 : make (chan []byte ),
115
- failureError : make (chan error , 1 ),
119
+ chunkWriterDone : make (chan bool , 1 ),
116
120
newUnorderedChunks : make (chan fileChunk , chanBufferSize ),
117
121
maxRetryPerDownloadBody : maxBodyRetries ,
118
122
md5ValidationOption : md5ValidationOption ,
119
123
sourceMd5Exists : sourceMd5Exists ,
124
+ currentReservedCapacity : 0 ,
120
125
}
121
126
go w .workerRoutine (ctx )
122
127
return w
@@ -137,14 +142,15 @@ func (w *chunkedFileWriter) WaitToScheduleChunk(ctx context.Context, id ChunkID,
137
142
w .chunkLogger .LogChunkStatus (id , EWaitReason .RAMToSchedule ())
138
143
err := w .cacheLimiter .WaitUntilAdd (ctx , chunkSize , w .shouldUseRelaxedRamThreshold )
139
144
if err == nil {
145
+ atomic .AddInt64 (& w .currentReservedCapacity , chunkSize )
140
146
atomic .AddInt32 (& w .activeChunkCount , 1 )
141
147
}
142
148
return err
149
+ //At this point, the book-keeping of this memory is chunkedFileWriter's responsibility
143
150
}
144
151
145
152
// Threadsafe method to enqueue a new chunk for processing
146
- func (w * chunkedFileWriter ) EnqueueChunk (ctx context.Context , id ChunkID , chunkSize int64 , chunkContents io.Reader , retryable bool ) error {
147
-
153
+ func (w * chunkedFileWriter ) EnqueueChunk (ctx context.Context , id ChunkID , chunkSize int64 , chunkContents io.Reader , retryable bool ) (err error ) {
148
154
readDone := make (chan struct {})
149
155
if retryable {
150
156
// if retryable == true, that tells us that closing the reader
@@ -158,8 +164,21 @@ func (w *chunkedFileWriter) EnqueueChunk(ctx context.Context, id ChunkID, chunkS
158
164
159
165
// read into a buffer
160
166
buffer := w .slicePool .RentSlice (chunkSize )
167
+
168
+ defer func () {
169
+ //cleanup stuff if we abruptly quit
170
+ if err == nil {
171
+ return //We've successfully queued, the worker will now takeover
172
+ }
173
+ w .cacheLimiter .Remove (chunkSize ) // remove this from the tally of scheduled-but-unsaved bytes
174
+ atomic .AddInt64 (& w .currentReservedCapacity , - chunkSize )
175
+ w .slicePool .ReturnSlice (buffer )
176
+ atomic .AddInt32 (& w .activeChunkCount , - 1 )
177
+ w .chunkLogger .LogChunkStatus (id , EWaitReason .ChunkDone ()) // this chunk is all finished
178
+ }()
179
+
161
180
readStart := time .Now ()
162
- _ , err : = io .ReadFull (chunkContents , buffer )
181
+ _ , err = io .ReadFull (chunkContents , buffer )
163
182
close (readDone )
164
183
if err != nil {
165
184
return err
@@ -172,15 +191,14 @@ func (w *chunkedFileWriter) EnqueueChunk(ctx context.Context, id ChunkID, chunkS
172
191
// enqueue it
173
192
w .chunkLogger .LogChunkStatus (id , EWaitReason .Sorting ())
174
193
select {
175
- case err = <- w .failureError :
194
+ case <- w .chunkWriterDone :
195
+ err = w .err
176
196
if err != nil {
177
197
return err
178
198
}
179
199
return ChunkWriterAlreadyFailed // channel returned nil because it was closed and empty
180
- case <- ctx .Done ():
181
- return ctx .Err ()
182
200
case w .newUnorderedChunks <- fileChunk {id : id , data : buffer }:
183
- return nil
201
+ return
184
202
}
185
203
}
186
204
@@ -189,15 +207,30 @@ func (w *chunkedFileWriter) Flush(ctx context.Context) ([]byte, error) {
189
207
// let worker know that no more will be coming
190
208
close (w .newUnorderedChunks )
191
209
210
+ /*
211
+ * We clear accounted but unused memory, i.e capacity, here. This capacity was
212
+ * requested from cacheLimiter when we were waiting to schedule this chunk.
213
+ * The below statement needs to happen after we've waited for all the chunks.
214
+ *
215
+ * Why should we do this?
216
+ * Ideally, the capacity should be zero here, because workerRoutine() would return
217
+ * the slice after saving the chunk. However, transferProcessor() is designed such that
218
+ * it has to schedule all chunks of jptm even if it has detected a failure in between.
219
+ * In such a case, we'd have added to the capacity of the fileWriter, while the
220
+ * workerRoutine() has already exited. We release that capacity here. When Flush() finds
221
+ * active chunks here, it is only those which have not rented a slice.
222
+ */
223
+ defer func () {
224
+ w .cacheLimiter .Remove (atomic .LoadInt64 (& w .currentReservedCapacity ))
225
+ }()
226
+
192
227
// wait until all written to disk
193
228
select {
194
- case err := <- w .failureError :
195
- if err != nil {
196
- return nil , err
229
+ case <- w .chunkWriterDone :
230
+ if w . err != nil {
231
+ return nil , w . err
197
232
}
198
233
return nil , ChunkWriterAlreadyFailed // channel returned nil because it was closed and empty
199
- case <- ctx .Done ():
200
- return nil , ctx .Err ()
201
234
case md5AtCompletion := <- w .successMd5 :
202
235
return md5AtCompletion , nil
203
236
}
@@ -221,6 +254,19 @@ func (w *chunkedFileWriter) workerRoutine(ctx context.Context) {
221
254
md5Hasher = & nullHasher {}
222
255
}
223
256
257
+ defer func () {
258
+ //cleanup stuff if we abruptly quit
259
+ for _ , chunk := range unsavedChunksByFileOffset {
260
+ w .cacheLimiter .Remove (int64 (chunk .id .length )) // remove this from the tally of scheduled-but-unsaved bytes
261
+ atomic .AddInt64 (& w .currentReservedCapacity , - chunk .id .length )
262
+ w .slicePool .ReturnSlice (chunk .data )
263
+ atomic .AddInt32 (& w .activeChunkCount , - 1 )
264
+ w .chunkLogger .LogChunkStatus (chunk .id , EWaitReason .ChunkDone ()) // this chunk is all finished
265
+ }
266
+ close (w .chunkWriterDone ) // must close because many goroutines may be calling the public methods, and all need to be able to tell there's been an error, even tho only one will get the actual error
267
+ unsavedChunksByFileOffset = nil
268
+ }()
269
+
224
270
for {
225
271
var newChunk fileChunk
226
272
var channelIsOpen bool
@@ -236,7 +282,7 @@ func (w *chunkedFileWriter) workerRoutine(ctx context.Context) {
236
282
return
237
283
}
238
284
case <- ctx .Done (): // If cancelled out in the middle of enqueuing chunks OR processing chunks, they will both cleanly cancel out and we'll get back to here.
239
- w .failureError <- ctx .Err ()
285
+ w .err = ctx .Err ()
240
286
return
241
287
}
242
288
@@ -248,8 +294,7 @@ func (w *chunkedFileWriter) workerRoutine(ctx context.Context) {
248
294
w .setStatusForContiguousAvailableChunks (unsavedChunksByFileOffset , nextOffsetToSave , ctx ) // update states of those that have all their prior ones already here
249
295
err := w .sequentiallyProcessAvailableChunks (unsavedChunksByFileOffset , & nextOffsetToSave , md5Hasher , ctx )
250
296
if err != nil {
251
- w .failureError <- err
252
- close (w .failureError ) // must close because many goroutines may be calling the public methods, and all need to be able to tell there's been an error, even tho only one will get the actual error
297
+ w .err = err
253
298
return // no point in processing any more after a failure
254
299
}
255
300
}
@@ -305,8 +350,9 @@ func (w *chunkedFileWriter) setStatusForContiguousAvailableChunks(unsavedChunksB
305
350
func (w * chunkedFileWriter ) saveOneChunk (chunk fileChunk , md5Hasher hash.Hash ) error {
306
351
defer func () {
307
352
w .cacheLimiter .Remove (int64 (len (chunk .data ))) // remove this from the tally of scheduled-but-unsaved bytes
308
- atomic .AddInt32 (& w .activeChunkCount , - 1 )
309
353
w .slicePool .ReturnSlice (chunk .data )
354
+ atomic .AddInt32 (& w .activeChunkCount , - 1 )
355
+ atomic .AddInt64 (& w .currentReservedCapacity , - chunk .id .length )
310
356
w .chunkLogger .LogChunkStatus (chunk .id , EWaitReason .ChunkDone ()) // this chunk is all finished
311
357
}()
312
358
0 commit comments