Skip to content

Commit fad439f

Browse files
authored
Fix/too many open files in download (#276)
* Limit number of open files when downloading, to fit within OS's configured limit Not needed when uploading, because there the number of open files is (more or less) limited by the number of go-routines used to initiate transfers. ("More or less" because when we do retries for 503s we will tend to open additonal file handles to re-read the retried data). * Reduce MaxIdleConnsPerHost To accomodate Linux systems with low open-files-per-user limit (since each open connection counts as a file) * Log concurrency parameters at start of job Useful for debugging things like the max concurrent files issue and limits * Take network connections into account when setting max number of concurrently open files * Use one HTTP client for the whole job, instead of one for each job part Previously, when doing one for each job part, we had an issue when the files in the job parts were very small, and there were a lot of files. This meant we got though each job part quickly, so quickly, that its lingering pool of open connections were an issue, especially on resource-constrained systems. While the connections are eventually cleaned up, they were not cleaned up fast enough to compensate for our fast progression through the job parts. Now, we use just one HTTP client, so there's just one connection pool for the whole job. * Add comment to explain max idle conns per host * Fix test suite to match change in connection handling
1 parent 4c3f86c commit fad439f

15 files changed

+528
-404
lines changed

.gitignore

+338-337
Large diffs are not rendered by default.

cmd/credentialUtil.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ func createBlobPipeline(ctx context.Context, credInfo common.CredentialInfo) (pi
277277
RetryDelay: ste.UploadRetryDelay,
278278
MaxRetryDelay: ste.UploadMaxRetryDelay,
279279
},
280-
nil), nil
280+
nil,
281+
ste.NewAzcopyHTTPClient()), nil
281282
}
282283

283284
func createBlobFSPipeline(ctx context.Context, credInfo common.CredentialInfo) (pipeline.Pipeline, error) {

common/cacheLimiter.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ import (
2929

3030
type Predicate func() bool
3131

32-
// Used to limit the amount of in-flight data in RAM, to keep it an an acceptable level.
33-
// For downloads, network is producer and disk is consumer, while for uploads the roles are reversed.
32+
// Used to limit the amounts of things. E.g. amount of in-flight data in RAM, to keep it an an acceptable level.
33+
// Also used for number of open files (since that's limited on Linux).
34+
// In the case of RAM usage, for downloads, network is producer and disk is consumer, while for uploads the roles are reversed.
3435
// In either case, if the producer is faster than the consumer, this CacheLimiter is necessary
35-
// prevent unbounded RAM usage
36+
// prevent unbounded RAM usage.
3637
type CacheLimiter interface {
37-
TryAddBytes(count int64, useRelaxedLimit bool) (added bool)
38-
WaitUntilAddBytes(ctx context.Context, count int64, useRelaxedLimit Predicate) error
39-
RemoveBytes(count int64)
38+
TryAdd(count int64, useRelaxedLimit bool) (added bool)
39+
WaitUntilAdd(ctx context.Context, count int64, useRelaxedLimit Predicate) error
40+
Remove(count int64)
41+
Limit() int64
4042
}
4143

4244
type cacheLimiter struct {
@@ -49,7 +51,7 @@ func NewCacheLimiter(limit int64) CacheLimiter {
4951
}
5052

5153
// TryAddBytes tries to add a memory allocation within the limit. Returns true if it could be (and was) added
52-
func (c *cacheLimiter) TryAddBytes(count int64, useRelaxedLimit bool) (added bool) {
54+
func (c *cacheLimiter) TryAdd(count int64, useRelaxedLimit bool) (added bool) {
5355
lim := c.limit
5456

5557
// Above the "strict" limit, there's a bit of extra room, which we use
@@ -63,6 +65,8 @@ func (c *cacheLimiter) TryAddBytes(count int64, useRelaxedLimit bool) (added boo
6365
// no backlogging of new chunks behind slow ones (i.e. these "good" cases are allowed to proceed without
6466
// interruption) and for uploads its used for re-doing the prefetches when we do retries (i.e. so these are
6567
// not blocked by other chunks using up RAM).
68+
// TODO: now that cacheLimiter is used for multiple purposes, the hard-coding of the distinction between
69+
// relaxed and strict limits is less appropriate. Refactor to make it a configuration param of the instance?
6670
}
6771

6872
if atomic.AddInt64(&c.value, count) <= lim {
@@ -74,10 +78,10 @@ func (c *cacheLimiter) TryAddBytes(count int64, useRelaxedLimit bool) (added boo
7478
}
7579

7680
/// WaitUntilAddBytes blocks until it completes a successful call to TryAddBytes
77-
func (c *cacheLimiter) WaitUntilAddBytes(ctx context.Context, count int64, useRelaxedLimit Predicate) error {
81+
func (c *cacheLimiter) WaitUntilAdd(ctx context.Context, count int64, useRelaxedLimit Predicate) error {
7882
for {
7983
// Proceed if there's room in the cache
80-
if c.TryAddBytes(count, useRelaxedLimit()) {
84+
if c.TryAdd(count, useRelaxedLimit()) {
8185
return nil
8286
}
8387

@@ -97,7 +101,11 @@ func (c *cacheLimiter) WaitUntilAddBytes(ctx context.Context, count int64, useRe
97101
}
98102
}
99103

100-
func (c *cacheLimiter) RemoveBytes(count int64) {
104+
func (c *cacheLimiter) Remove(count int64) {
101105
negativeDelta := -count
102106
atomic.AddInt64(&c.value, negativeDelta)
103107
}
108+
109+
func (c *cacheLimiter) Limit() int64 {
110+
return c.limit
111+
}

common/chunkedFileWriter.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ const maxDesirableActiveChunks = 20 // TODO: can we find a sensible way to remov
131131
// from the cache limiter, which is also in this struct.
132132
func (w *chunkedFileWriter) WaitToScheduleChunk(ctx context.Context, id ChunkID, chunkSize int64) error {
133133
w.chunkLogger.LogChunkStatus(id, EWaitReason.RAMToSchedule())
134-
err := w.cacheLimiter.WaitUntilAddBytes(ctx, chunkSize, w.shouldUseRelaxedRamThreshold)
134+
err := w.cacheLimiter.WaitUntilAdd(ctx, chunkSize, w.shouldUseRelaxedRamThreshold)
135135
if err == nil {
136136
atomic.AddInt32(&w.activeChunkCount, 1)
137137
}
@@ -286,7 +286,7 @@ func (w *chunkedFileWriter) setStatusForContiguousAvailableChunks(unsavedChunksB
286286
// Saves one chunk to its destination
287287
func (w *chunkedFileWriter) saveOneChunk(chunk fileChunk) error {
288288
defer func() {
289-
w.cacheLimiter.RemoveBytes(int64(len(chunk.data))) // remove this from the tally of scheduled-but-unsaved bytes
289+
w.cacheLimiter.Remove(int64(len(chunk.data))) // remove this from the tally of scheduled-but-unsaved bytes
290290
atomic.AddInt32(&w.activeChunkCount, -1)
291291
w.slicePool.ReturnSlice(chunk.data)
292292
w.chunkLogger.LogChunkStatus(chunk.id, EWaitReason.ChunkDone()) // this chunk is all finished
@@ -311,9 +311,9 @@ func (w *chunkedFileWriter) shouldUseRelaxedRamThreshold() bool {
311311

312312
// Are we currently in a memory-constrained situation?
313313
func (w *chunkedFileWriter) haveMemoryPressure(chunkSize int64) bool {
314-
didAdd := w.cacheLimiter.TryAddBytes(chunkSize, w.shouldUseRelaxedRamThreshold())
314+
didAdd := w.cacheLimiter.TryAdd(chunkSize, w.shouldUseRelaxedRamThreshold())
315315
if didAdd {
316-
w.cacheLimiter.RemoveBytes(chunkSize) // remove immediately, since this was only a test
316+
w.cacheLimiter.Remove(chunkSize) // remove immediately, since this was only a test
317317
}
318318
return !didAdd
319319
}

common/singleChunkReader.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (cr *singleChunkReader) blockingPrefetch(fileReader io.ReaderAt, isRetry bo
194194
// here doing retries, but no RAM _will_ become available because its
195195
// all used by queued chunkfuncs (that can't be processed because all goroutines are active).
196196
cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.RAMToSchedule())
197-
err := cr.cacheLimiter.WaitUntilAddBytes(cr.ctx, cr.length, func() bool { return isRetry })
197+
err := cr.cacheLimiter.WaitUntilAdd(cr.ctx, cr.length, func() bool { return isRetry })
198198
if err != nil {
199199
return err
200200
}
@@ -318,7 +318,7 @@ func (cr *singleChunkReader) returnBuffer() {
318318
return
319319
}
320320
cr.slicePool.ReturnSlice(cr.buffer)
321-
cr.cacheLimiter.RemoveBytes(int64(len(cr.buffer)))
321+
cr.cacheLimiter.Remove(int64(len(cr.buffer)))
322322
cr.buffer = nil
323323
}
324324

main.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,18 @@ func main() {
4747
return
4848
}
4949

50+
configureGC()
51+
5052
// Perform os specific initialization
51-
_, err := ProcessOSSpecificInitialization()
53+
maxFileAndSocketHandles, err := ProcessOSSpecificInitialization()
5254
if err != nil {
5355
log.Fatalf("initialization failed: %v", err)
5456
}
5557

56-
configureGC()
58+
concurrentConnections := common.ComputeConcurrencyValue(runtime.NumCPU())
59+
concurrentFilesLimit := computeConcurrentFilesLimit(maxFileAndSocketHandles, concurrentConnections)
5760

58-
err = ste.MainSTE(common.ComputeConcurrencyValue(runtime.NumCPU()), 2400, azcopyAppPathFolder, azcopyLogPathFolder)
61+
err = ste.MainSTE(concurrentConnections, concurrentFilesLimit, 2400, azcopyAppPathFolder, azcopyLogPathFolder)
5962
common.PanicIfErr(err)
6063

6164
cmd.Execute(azcopyAppPathFolder, azcopyLogPathFolder)
@@ -71,3 +74,26 @@ func configureGC() {
7174
debug.SetGCPercent(20) // activate more aggressive/frequent GC than the default
7275
}()
7376
}
77+
78+
// ComputeConcurrentFilesLimit finds a number of concurrently-openable files
79+
// such that we'll have enough handles left, after using some as network handles
80+
// TODO: add environment var to optionally allow bringing concurrentFiles down lower
81+
// (and, when we do, actually USE it for uploads, since currently we're only using it on downloads)
82+
// (update logging
83+
func computeConcurrentFilesLimit(maxFileAndSocketHandles int, concurrentConnections int) int {
84+
85+
allowanceForOnGoingEnumeration := 1 // might still be scanning while we are transferring. Make this bigger if we ever do parallel scanning
86+
87+
// Compute a very conservative estimate for total number of connections that we may have
88+
// To get a conservative estimate we pessimistically assume that the pool of idle conns is full,
89+
// but all the ones we are actually using are (by some fluke of timing) not in the pool.
90+
// TODO: consider actually SETTING AzCopyMaxIdleConnsPerHost to say, max(0.3 * FileAndSocketHandles, 1000), instead of using the hard-coded value we currently have
91+
possibleMaxTotalConcurrentHttpConnections := concurrentConnections + ste.AzCopyMaxIdleConnsPerHost + allowanceForOnGoingEnumeration
92+
93+
concurrentFilesLimit := maxFileAndSocketHandles - possibleMaxTotalConcurrentHttpConnections
94+
95+
if concurrentFilesLimit < ste.NumTransferInitiationRoutines {
96+
concurrentFilesLimit = ste.NumTransferInitiationRoutines // Set sensible floor, so we don't get negative or zero values if maxFileAndSocketHandles is low
97+
}
98+
return concurrentFilesLimit
99+
}

main_unix.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,19 @@
2323
package main
2424

2525
import (
26+
"math"
2627
"os"
2728
"path"
2829
"syscall"
2930
)
3031

3132
// ProcessOSSpecificInitialization changes the soft limit for file descriptor for process
32-
// and returns the file descriptor limit for process. If the function fails with some error
33-
// it returns the error.
33+
// and returns the new file descriptor limit for process.
34+
// We need to do this because the default limits are low on Linux, and we concurrently open lots of files
35+
// and sockets (both of which count towards this limit).
3436
// Api gets the hard limit for process file descriptor
35-
// and sets the soft limit for process file descriptor to above hard limit
36-
func ProcessOSSpecificInitialization() (uint64, error) {
37+
// and sets the soft limit for process file descriptor to (hard limit - 1)
38+
func ProcessOSSpecificInitialization() (int, error) {
3739
var rlimit, zero syscall.Rlimit
3840
// get the hard limit
3941
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit)
@@ -47,12 +49,15 @@ func ProcessOSSpecificInitialization() (uint64, error) {
4749
set := rlimit
4850
// set the current limit to one less than max of the rlimit
4951
set.Cur = set.Max - 1
50-
// set the soft limit to above rlimit
5152
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &set)
5253
if err != nil {
5354
return 0, err
5455
}
55-
return set.Max, nil
56+
if set.Cur > math.MaxInt32 {
57+
return math.MaxInt32, nil
58+
} else {
59+
return int(set.Cur), nil
60+
}
5661
}
5762

5863
// GetAzCopyAppPath returns the path of Azcopy folder in local appdata.

main_windows.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package main
2222

2323
import (
24+
"math"
2425
"os"
2526
"os/exec"
2627
"path"
@@ -39,9 +40,14 @@ func osModifyProcessCommand(cmd *exec.Cmd) *exec.Cmd {
3940
// ProcessOSSpecificInitialization chnages the soft limit for filedescriptor for process
4041
// return the filedescriptor limit for process. If the function fails with some, it returns
4142
// the error
42-
// TODO: this api is implemented for windows as well but not required.
43+
// TODO: this api is implemented for windows as well but not required because Windows
44+
// does not default to a precise low limit like Linux does
4345
func ProcessOSSpecificInitialization() (int, error) {
44-
return 0, nil
46+
47+
// this exaggerates what's possible, but is accurate enough for our purposes, in which our goal is simply to apply no specific limit on Windows
48+
const effectivelyUnlimited = math.MaxInt32
49+
50+
return effectivelyUnlimited, nil
4551
}
4652

4753
// GetAzCopyAppPath returns the path of Azcopy in local appdata.

ste/JobsAdmin.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ var JobsAdmin interface {
9696
common.ILoggerCloser
9797
}
9898

99-
func initJobsAdmin(appCtx context.Context, concurrentConnections int, targetRateInMBps int64, azcopyAppPathFolder string, azcopyLogPathFolder string) {
99+
func initJobsAdmin(appCtx context.Context, concurrentConnections int, concurrentFilesLimit int, targetRateInMBps int64, azcopyAppPathFolder string, azcopyLogPathFolder string) {
100100
if JobsAdmin != nil {
101101
panic("initJobsAdmin was already called once")
102102
}
@@ -140,14 +140,15 @@ func initJobsAdmin(appCtx context.Context, concurrentConnections int, targetRate
140140
maxRamBytesToUse := int64(gbToUse * 1024 * 1024 * 1024)
141141

142142
ja := &jobsAdmin{
143-
logger: common.NewAppLogger(pipeline.LogInfo, azcopyLogPathFolder),
144-
jobIDToJobMgr: newJobIDToJobMgr(),
145-
logDir: azcopyLogPathFolder,
146-
planDir: planDir,
147-
pacer: newPacer(targetRateInMBps * 1024 * 1024),
148-
slicePool: common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize),
149-
cacheLimiter: common.NewCacheLimiter(maxRamBytesToUse),
150-
appCtx: appCtx,
143+
logger: common.NewAppLogger(pipeline.LogInfo, azcopyLogPathFolder),
144+
jobIDToJobMgr: newJobIDToJobMgr(),
145+
logDir: azcopyLogPathFolder,
146+
planDir: planDir,
147+
pacer: newPacer(targetRateInMBps * 1024 * 1024),
148+
slicePool: common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize),
149+
cacheLimiter: common.NewCacheLimiter(maxRamBytesToUse),
150+
fileCountLimiter: common.NewCacheLimiter(int64(concurrentFilesLimit)),
151+
appCtx: appCtx,
151152
coordinatorChannels: CoordinatorChannels{
152153
partsChannel: partsCh,
153154
normalTransferCh: normalTransferCh,
@@ -181,11 +182,13 @@ func initJobsAdmin(appCtx context.Context, concurrentConnections int, targetRate
181182
// out progress on already-scheduled chunks. (Not sure whether that can really happen, but this protects against it
182183
// anyway.)
183184
// Perhaps MORE importantly, doing this separately gives us more CONTROL over how we interact with the file system.
184-
for cc := 0; cc < 64; cc++ {
185+
for cc := 0; cc < NumTransferInitiationRoutines; cc++ {
185186
go ja.transferProcessor(cc)
186187
}
187188
}
188189

190+
const NumTransferInitiationRoutines = 64 // TODO make this configurable
191+
189192
// QueueJobParts puts the given JobPartManager into the partChannel
190193
// from where this JobPartMgr will be picked by a routine and
191194
// its transfers will be scheduled
@@ -289,6 +292,7 @@ type jobsAdmin struct {
289292
pacer *pacer
290293
slicePool common.ByteSlicePooler
291294
cacheLimiter common.CacheLimiter
295+
fileCountLimiter common.CacheLimiter
292296
}
293297

294298
type CoordinatorChannels struct {

ste/init.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ func ToFixed(num float64, precision int) float64 {
4949
}
5050

5151
// MainSTE initializes the Storage Transfer Engine
52-
func MainSTE(concurrentConnections int, targetRateInMBps int64, azcopyAppPathFolder, azcopyLogPathFolder string) error {
52+
func MainSTE(concurrentConnections int, concurrentFilesLimit int, targetRateInMBps int64, azcopyAppPathFolder, azcopyLogPathFolder string) error {
5353
// Initialize the JobsAdmin, resurrect Job plan files
54-
initJobsAdmin(steCtx, concurrentConnections, targetRateInMBps, azcopyAppPathFolder, azcopyLogPathFolder)
54+
initJobsAdmin(steCtx, concurrentConnections, concurrentFilesLimit, targetRateInMBps, azcopyAppPathFolder, azcopyLogPathFolder)
5555
// No need to read the existing JobPartPlan files since Azcopy is running in process
5656
//JobsAdmin.ResurrectJobParts()
5757
// TODO: We may want to list listen first and terminate if there is already an instance listening

ste/mgr-JobMgr.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ package ste
2323
import (
2424
"context"
2525
"fmt"
26+
"net/http"
27+
"runtime"
2628
"strings"
2729
"sync"
2830
"sync/atomic"
@@ -71,6 +73,7 @@ type IJobMgr interface {
7173
getInMemoryTransitJobState() InMemoryTransitJobState // get in memory transit job state saved in this job.
7274
setInMemoryTransitJobState(state InMemoryTransitJobState) // set in memory transit job state saved in this job.
7375
LogChunkStatus(id common.ChunkID, reason common.WaitReason)
76+
HttpClient() *http.Client
7477

7578
common.ILoggerCloser
7679
}
@@ -81,6 +84,7 @@ func newJobMgr(appLogger common.ILogger, jobID common.JobID, appCtx context.Cont
8184
// atomicAllTransfersScheduled is set to 1 since this api is also called when new job part is ordered.
8285
enableChunkLogOutput := level.ToPipelineLogLevel() == pipeline.LogDebug
8386
jm := jobMgr{jobID: jobID, jobPartMgrs: newJobPartToJobPartMgr(), include: map[string]int{}, exclude: map[string]int{},
87+
httpClient: NewAzcopyHTTPClient(),
8488
logger: common.NewJobLogger(jobID, level, appLogger, logFileFolder),
8589
chunkStatusLogger: common.NewChunkStatusLogger(jobID, logFileFolder, enableChunkLogOutput),
8690
/*Other fields remain zero-value until this job is scheduled */}
@@ -96,13 +100,22 @@ func (jm *jobMgr) reset(appCtx context.Context, commandString string) IJobMgr {
96100
if len(commandString) > 0 {
97101
jm.logger.Log(pipeline.LogInfo, fmt.Sprintf("Job-Command %s", commandString))
98102
}
103+
jm.logConcurrencyParameters()
99104
jm.ctx, jm.cancel = context.WithCancel(appCtx)
100105
atomic.StoreUint64(&jm.atomicNumberOfBytesCovered, 0)
101106
atomic.StoreUint64(&jm.atomicTotalBytesToXfer, 0)
102107
jm.partsDone = 0
103108
return jm
104109
}
105110

111+
func (jm *jobMgr) logConcurrencyParameters() {
112+
jm.logger.Log(pipeline.LogInfo, fmt.Sprintf("Number of CPUs: %d", runtime.NumCPU()))
113+
jm.logger.Log(pipeline.LogInfo, fmt.Sprintf("Max file buffer RAM %.3f GB", float32(JobsAdmin.(*jobsAdmin).cacheLimiter.Limit())/(1024*1024*1024)))
114+
jm.logger.Log(pipeline.LogInfo, fmt.Sprintf("Max open files when downloading: %d", JobsAdmin.(*jobsAdmin).fileCountLimiter.Limit()))
115+
jm.logger.Log(pipeline.LogInfo, fmt.Sprintf("Max concurrent transfer initiation routines: %d", NumTransferInitiationRoutines))
116+
// TODO: find a way to add concurrency value here (i.e. number of chunk func worker go routines)
117+
}
118+
106119
// jobMgr represents the runtime information for a Job
107120
type jobMgr struct {
108121
logger common.ILoggerResetable
@@ -111,6 +124,10 @@ type jobMgr struct {
111124
ctx context.Context
112125
cancel context.CancelFunc
113126

127+
// Share the same HTTP Client across all job parts, so that the we maximize re-use of
128+
// its internal connection pool
129+
httpClient *http.Client
130+
114131
jobPartMgrs jobPartToJobPartMgr // The map of part #s to JobPartMgrs
115132
// partsDone keep the count of completed part of the Job.
116133
partsDone uint32
@@ -212,8 +229,9 @@ func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, s
212229
destinationSAS string, scheduleTransfers bool) IJobPartMgr {
213230
jpm := &jobPartMgr{jobMgr: jm, filename: planFile, sourceSAS: sourceSAS,
214231
destinationSAS: destinationSAS, pacer: JobsAdmin.(*jobsAdmin).pacer,
215-
slicePool: JobsAdmin.(*jobsAdmin).slicePool,
216-
cacheLimiter: JobsAdmin.(*jobsAdmin).cacheLimiter}
232+
slicePool: JobsAdmin.(*jobsAdmin).slicePool,
233+
cacheLimiter: JobsAdmin.(*jobsAdmin).cacheLimiter,
234+
fileCountLimiter: JobsAdmin.(*jobsAdmin).fileCountLimiter}
217235
jpm.planMMF = jpm.filename.Map()
218236
jm.jobPartMgrs.Set(partNum, jpm)
219237
jm.finalPartOrdered = jpm.planMMF.Plan().IsFinalPart
@@ -253,6 +271,10 @@ func (jm *jobMgr) setDirection(fromTo common.FromTo) {
253271
}
254272
}
255273

274+
func (jm *jobMgr) HttpClient() *http.Client {
275+
return jm.httpClient
276+
}
277+
256278
// SetIncludeExclude sets the include / exclude list of transfers
257279
// supplied with resume command to include or exclude mentioned transfers
258280
func (jm *jobMgr) SetIncludeExclude(include, exclude map[string]int) {

0 commit comments

Comments
 (0)