-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change migration progress log to be more verbose #5122
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,13 +71,19 @@ func MigrateByAccount(migrator AccountMigrator, allPayloads []ledger.Payload, nW | |
log.Info().Msgf("start grouping for a total of %v payloads", len(allPayloads)) | ||
|
||
var err error | ||
logGrouping := util.LogProgress("grouping payload", len(allPayloads), log.Logger) | ||
for i, payload := range allPayloads { | ||
logGrouping := util.LogProgress( | ||
log.Logger, | ||
util.DefaultLogProgressConfig( | ||
"grouping payload", | ||
len(allPayloads), | ||
), | ||
) | ||
for _, payload := range allPayloads { | ||
groups, err = PayloadGrouping(groups, payload) | ||
if err != nil { | ||
return nil, err | ||
} | ||
logGrouping(i) | ||
logGrouping(1) | ||
} | ||
|
||
log.Info().Msgf("finish grouping for payloads by account: %v groups in total, %v NonAccountPayloads", | ||
|
@@ -108,9 +114,14 @@ func MigrateGroupSequentially( | |
) ( | ||
[]ledger.Payload, error) { | ||
|
||
logAccount := util.LogProgress("processing account group", len(payloadsByAccount), log.Logger) | ||
logAccount := util.LogProgress( | ||
log.Logger, | ||
util.DefaultLogProgressConfig( | ||
"processing account group", | ||
len(payloadsByAccount), | ||
), | ||
) | ||
|
||
i := 0 | ||
migrated := make([]ledger.Payload, 0) | ||
for address, payloads := range payloadsByAccount { | ||
accountMigrated, err := migrator.MigratePayloads(address, payloads) | ||
|
@@ -119,8 +130,7 @@ func MigrateGroupSequentially( | |
} | ||
|
||
migrated = append(migrated, accountMigrated...) | ||
logAccount(i) | ||
i++ | ||
logAccount(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe optimize by calling |
||
} | ||
|
||
return migrated, nil | ||
|
@@ -171,7 +181,13 @@ func MigrateGroupConcurrently( | |
} | ||
|
||
// read job results | ||
logAccount := util.LogProgress("processing account group", len(payloadsByAccount), log.Logger) | ||
logAccount := util.LogProgress( | ||
log.Logger, | ||
util.DefaultLogProgressConfig( | ||
"processing account group", | ||
len(payloadsByAccount), | ||
), | ||
) | ||
|
||
migrated := make([]ledger.Payload, 0) | ||
|
||
|
@@ -183,7 +199,7 @@ func MigrateGroupConcurrently( | |
|
||
accountMigrated := result.Migrated | ||
migrated = append(migrated, accountMigrated...) | ||
logAccount(i) | ||
logAccount(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
} | ||
|
||
return migrated, nil | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -517,9 +517,15 @@ func StoreCheckpointV5(dir string, fileName string, logger zerolog.Logger, tries | |
} | ||
|
||
func logProgress(msg string, estimatedSubtrieNodeCount int, logger zerolog.Logger) func(nodeCounter uint64) { | ||
lg := util.LogProgress(msg, estimatedSubtrieNodeCount, logger) | ||
lg := util.LogProgress( | ||
logger, | ||
util.DefaultLogProgressConfig( | ||
msg, | ||
estimatedSubtrieNodeCount, | ||
), | ||
) | ||
return func(index uint64) { | ||
lg(int(index)) | ||
lg(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this function is called for reading and writing every node during checkpointing. Maybe check performance impact with logging and locking overhead here and optimize if needed. |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,25 +1,149 @@ | ||||||||||||||
package util | ||||||||||||||
|
||||||||||||||
import ( | ||||||||||||||
"sync" | ||||||||||||||
"sync/atomic" | ||||||||||||||
"time" | ||||||||||||||
|
||||||||||||||
"github.com/rs/zerolog" | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
// LogProgress takes a total and return function such that when called with a 0-based index | ||||||||||||||
// it prints the progress from 0% to 100% to indicate the index from 0 to (total - 1) has been | ||||||||||||||
// processed. | ||||||||||||||
// useful to report the progress of processing the index from 0 to (total - 1) | ||||||||||||||
func LogProgress(msg string, total int, logger zerolog.Logger) func(currentIndex int) { | ||||||||||||||
logThreshold := float64(0) | ||||||||||||||
return func(currentIndex int) { | ||||||||||||||
// LogProgressFunc is a function that can be called to add to the progress | ||||||||||||||
type LogProgressFunc func(addProgress int) | ||||||||||||||
|
||||||||||||||
type LogProgressConfig struct { | ||||||||||||||
Message string | ||||||||||||||
Total int | ||||||||||||||
Sampler zerolog.Sampler | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func DefaultLogProgressConfig( | ||||||||||||||
message string, | ||||||||||||||
total int, | ||||||||||||||
) LogProgressConfig { | ||||||||||||||
nth := uint32(total / 10) // sample every 10% by default | ||||||||||||||
if nth == 0 { | ||||||||||||||
nth = 1 | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
sampler := newProgressLogsSampler(nth, 60*time.Second) | ||||||||||||||
return NewLogProgressConfig( | ||||||||||||||
message, | ||||||||||||||
total, | ||||||||||||||
sampler, | ||||||||||||||
) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func NewLogProgressConfig( | ||||||||||||||
message string, | ||||||||||||||
total int, | ||||||||||||||
sampler zerolog.Sampler) LogProgressConfig { | ||||||||||||||
return LogProgressConfig{ | ||||||||||||||
Message: message, | ||||||||||||||
Total: total, | ||||||||||||||
Sampler: sampler, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
type LogProgressOption func(config *LogProgressConfig) | ||||||||||||||
|
||||||||||||||
// LogProgress takes a total and return function such that when called adds the given | ||||||||||||||
// number to the progress and logs the progress every 10% or every 60 seconds whichever | ||||||||||||||
// comes first. | ||||||||||||||
Comment on lines
+51
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
// The returned function can be called concurrently. | ||||||||||||||
// An eta is also logged, but it assumes that the progress is linear. | ||||||||||||||
func LogProgress( | ||||||||||||||
log zerolog.Logger, | ||||||||||||||
config LogProgressConfig, | ||||||||||||||
) LogProgressFunc { | ||||||||||||||
sampler := log.Sample(config.Sampler) | ||||||||||||||
|
||||||||||||||
start := time.Now() | ||||||||||||||
currentIndex := uint64(0) | ||||||||||||||
return func(add int) { | ||||||||||||||
current := atomic.AddUint64(¤tIndex, uint64(add)) | ||||||||||||||
|
||||||||||||||
percentage := float64(100) | ||||||||||||||
if total > 0 { | ||||||||||||||
percentage = (float64(currentIndex+1) / float64(total)) * 100. // currentIndex+1 assuming zero based indexing | ||||||||||||||
if config.Total > 0 { | ||||||||||||||
percentage = (float64(current) / float64(config.Total)) * 100. | ||||||||||||||
} | ||||||||||||||
elapsed := time.Since(start) | ||||||||||||||
elapsedString := elapsed.Round(1 * time.Second).String() | ||||||||||||||
|
||||||||||||||
// report every 10 percent | ||||||||||||||
if percentage >= logThreshold { | ||||||||||||||
logger.Info().Msgf("%s progress: %v percent", msg, logThreshold) | ||||||||||||||
logThreshold += 10 | ||||||||||||||
etaString := "unknown" | ||||||||||||||
if percentage > 0 { | ||||||||||||||
eta := time.Duration(float64(elapsed) / percentage * (100 - percentage)) | ||||||||||||||
if eta < 0 { | ||||||||||||||
eta = 0 | ||||||||||||||
} | ||||||||||||||
etaString = eta.Round(1 * time.Second).String() | ||||||||||||||
|
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if current != uint64(config.Total) { | ||||||||||||||
sampler.Info().Msgf("%s progress %d/%d (%.1f%%) elapsed: %s, eta %s", config.Message, current, config.Total, percentage, elapsedString, etaString) | ||||||||||||||
} else { | ||||||||||||||
log.Info().Msgf("%s progress %d/%d (%.1f%%) total time %s", config.Message, current, config.Total, percentage, elapsedString) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
type TimedSampler struct { | ||||||||||||||
start time.Time | ||||||||||||||
Duration time.Duration | ||||||||||||||
mu sync.Mutex | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
var _ zerolog.Sampler = (*TimedSampler)(nil) | ||||||||||||||
|
||||||||||||||
func NewTimedSampler(duration time.Duration) *TimedSampler { | ||||||||||||||
return &TimedSampler{ | ||||||||||||||
start: time.Now(), | ||||||||||||||
Duration: duration, | ||||||||||||||
mu: sync.Mutex{}, | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (s *TimedSampler) Sample(_ zerolog.Level) bool { | ||||||||||||||
s.mu.Lock() | ||||||||||||||
defer s.mu.Unlock() | ||||||||||||||
|
||||||||||||||
if time.Since(s.start) > s.Duration { | ||||||||||||||
s.start = time.Now() | ||||||||||||||
return true | ||||||||||||||
} | ||||||||||||||
return false | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (s *TimedSampler) Reset() { | ||||||||||||||
s.mu.Lock() | ||||||||||||||
defer s.mu.Unlock() | ||||||||||||||
|
||||||||||||||
s.start = time.Now() | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
type progressLogsSampler struct { | ||||||||||||||
basicSampler *zerolog.BasicSampler | ||||||||||||||
timedSampler *TimedSampler | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
var _ zerolog.Sampler = (*progressLogsSampler)(nil) | ||||||||||||||
|
||||||||||||||
// newProgressLogsSampler returns a sampler that samples every nth log | ||||||||||||||
// and also samples a log if the last log was more than duration ago | ||||||||||||||
func newProgressLogsSampler(nth uint32, duration time.Duration) zerolog.Sampler { | ||||||||||||||
return &progressLogsSampler{ | ||||||||||||||
basicSampler: &zerolog.BasicSampler{N: nth}, | ||||||||||||||
timedSampler: NewTimedSampler(duration), | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (s *progressLogsSampler) Sample(lvl zerolog.Level) bool { | ||||||||||||||
sample := s.basicSampler.Sample(lvl) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe rename
Suggested change
|
||||||||||||||
if sample { | ||||||||||||||
s.timedSampler.Reset() | ||||||||||||||
return true | ||||||||||||||
} | ||||||||||||||
return s.timedSampler.Sample(lvl) | ||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe optimize by calling
logGrouping()
every n payloads (not every payload) becauselogGrouping()
has locking overhead.