Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change migration progress log to be more verbose #5122

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions cmd/util/ledger/migrations/account_based_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,19 @@ func MigrateByAccount(migrator AccountMigrator, allPayloads []ledger.Payload, nW
log.Info().Msgf("start grouping for a total of %v payloads", len(allPayloads))

var err error
logGrouping := util.LogProgress("grouping payload", len(allPayloads), log.Logger)
for i, payload := range allPayloads {
logGrouping := util.LogProgress(
log.Logger,
util.DefaultLogProgressConfig(
"grouping payload",
len(allPayloads),
),
)
for _, payload := range allPayloads {
groups, err = PayloadGrouping(groups, payload)
if err != nil {
return nil, err
}
logGrouping(i)
logGrouping(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe optimize by calling logGrouping() every n payloads (not every payload) because logGrouping() has locking overhead.

}

log.Info().Msgf("finish grouping for payloads by account: %v groups in total, %v NonAccountPayloads",
Expand Down Expand Up @@ -108,9 +114,14 @@ func MigrateGroupSequentially(
) (
[]ledger.Payload, error) {

logAccount := util.LogProgress("processing account group", len(payloadsByAccount), log.Logger)
logAccount := util.LogProgress(
log.Logger,
util.DefaultLogProgressConfig(
"processing account group",
len(payloadsByAccount),
),
)

i := 0
migrated := make([]ledger.Payload, 0)
for address, payloads := range payloadsByAccount {
accountMigrated, err := migrator.MigratePayloads(address, payloads)
Expand All @@ -119,8 +130,7 @@ func MigrateGroupSequentially(
}

migrated = append(migrated, accountMigrated...)
logAccount(i)
i++
logAccount(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe optimize by calling logAccount() every n account migrations because logAccount() has locking overhead.

}

return migrated, nil
Expand Down Expand Up @@ -171,7 +181,13 @@ func MigrateGroupConcurrently(
}

// read job results
logAccount := util.LogProgress("processing account group", len(payloadsByAccount), log.Logger)
logAccount := util.LogProgress(
log.Logger,
util.DefaultLogProgressConfig(
"processing account group",
len(payloadsByAccount),
),
)

migrated := make([]ledger.Payload, 0)

Expand All @@ -183,7 +199,7 @@ func MigrateGroupConcurrently(

accountMigrated := result.Migrated
migrated = append(migrated, accountMigrated...)
logAccount(i)
logAccount(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

}

return migrated, nil
Expand Down
88 changes: 88 additions & 0 deletions engine/execution/mock/extendable_storage_snapshot.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions ledger/complete/wal/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,15 @@ func StoreCheckpointV5(dir string, fileName string, logger zerolog.Logger, tries
}

func logProgress(msg string, estimatedSubtrieNodeCount int, logger zerolog.Logger) func(nodeCounter uint64) {
lg := util.LogProgress(msg, estimatedSubtrieNodeCount, logger)
lg := util.LogProgress(
logger,
util.DefaultLogProgressConfig(
msg,
estimatedSubtrieNodeCount,
),
)
return func(index uint64) {
lg(int(index))
lg(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this function is called for reading and writing every node during checkpointing. Maybe check performance impact with logging and locking overhead here and optimize if needed.

}
}

Expand Down
150 changes: 137 additions & 13 deletions module/util/log.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,149 @@
package util

import (
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog"
)

// LogProgress takes a total and return function such that when called with a 0-based index
// it prints the progress from 0% to 100% to indicate the index from 0 to (total - 1) has been
// processed.
// useful to report the progress of processing the index from 0 to (total - 1)
func LogProgress(msg string, total int, logger zerolog.Logger) func(currentIndex int) {
logThreshold := float64(0)
return func(currentIndex int) {
// LogProgressFunc is a function that can be called to add to the progress
type LogProgressFunc func(addProgress int)

type LogProgressConfig struct {
Message string
Total int
Sampler zerolog.Sampler
}

func DefaultLogProgressConfig(
message string,
total int,
) LogProgressConfig {
nth := uint32(total / 10) // sample every 10% by default
if nth == 0 {
nth = 1
}

sampler := newProgressLogsSampler(nth, 60*time.Second)
return NewLogProgressConfig(
message,
total,
sampler,
)
}

func NewLogProgressConfig(
message string,
total int,
sampler zerolog.Sampler) LogProgressConfig {
return LogProgressConfig{
Message: message,
Total: total,
Sampler: sampler,
}

}

type LogProgressOption func(config *LogProgressConfig)

// LogProgress takes a total and return function such that when called adds the given
// number to the progress and logs the progress every 10% or every 60 seconds whichever
// comes first.
Comment on lines +51 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// LogProgress takes a total and return function such that when called adds the given
// number to the progress and logs the progress every 10% or every 60 seconds whichever
// comes first.
// LogProgress takes a LogProgressConfig and return function such that when called adds the given
// number to the progress and logs the progress every specified seconds or % completed
// (whichever comes first) as specified in LogProgressConfig.

// The returned function can be called concurrently.
// An eta is also logged, but it assumes that the progress is linear.
func LogProgress(
log zerolog.Logger,
config LogProgressConfig,
) LogProgressFunc {
sampler := log.Sample(config.Sampler)

start := time.Now()
currentIndex := uint64(0)
return func(add int) {
current := atomic.AddUint64(&currentIndex, uint64(add))

percentage := float64(100)
if total > 0 {
percentage = (float64(currentIndex+1) / float64(total)) * 100. // currentIndex+1 assuming zero based indexing
if config.Total > 0 {
percentage = (float64(current) / float64(config.Total)) * 100.
}
elapsed := time.Since(start)
elapsedString := elapsed.Round(1 * time.Second).String()

// report every 10 percent
if percentage >= logThreshold {
logger.Info().Msgf("%s progress: %v percent", msg, logThreshold)
logThreshold += 10
etaString := "unknown"
if percentage > 0 {
eta := time.Duration(float64(elapsed) / percentage * (100 - percentage))
if eta < 0 {
eta = 0
}
etaString = eta.Round(1 * time.Second).String()

}

if current != uint64(config.Total) {
sampler.Info().Msgf("%s progress %d/%d (%.1f%%) elapsed: %s, eta %s", config.Message, current, config.Total, percentage, elapsedString, etaString)
} else {
log.Info().Msgf("%s progress %d/%d (%.1f%%) total time %s", config.Message, current, config.Total, percentage, elapsedString)
}
}
}

type TimedSampler struct {
start time.Time
Duration time.Duration
mu sync.Mutex
}

var _ zerolog.Sampler = (*TimedSampler)(nil)

func NewTimedSampler(duration time.Duration) *TimedSampler {
return &TimedSampler{
start: time.Now(),
Duration: duration,
mu: sync.Mutex{},
}
}

func (s *TimedSampler) Sample(_ zerolog.Level) bool {
s.mu.Lock()
defer s.mu.Unlock()

if time.Since(s.start) > s.Duration {
s.start = time.Now()
return true
}
return false
}

func (s *TimedSampler) Reset() {
s.mu.Lock()
defer s.mu.Unlock()

s.start = time.Now()
}

type progressLogsSampler struct {
basicSampler *zerolog.BasicSampler
timedSampler *TimedSampler
}

var _ zerolog.Sampler = (*progressLogsSampler)(nil)

// newProgressLogsSampler returns a sampler that samples every nth log
// and also samples a log if the last log was more than duration ago
func newProgressLogsSampler(nth uint32, duration time.Duration) zerolog.Sampler {
return &progressLogsSampler{
basicSampler: &zerolog.BasicSampler{N: nth},
timedSampler: NewTimedSampler(duration),
}
}

func (s *progressLogsSampler) Sample(lvl zerolog.Level) bool {
sample := s.basicSampler.Sample(lvl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename sample to sampled.

Suggested change
sample := s.basicSampler.Sample(lvl)
sampled := s.basicSampler.Sample(lvl)

if sample {
s.timedSampler.Reset()
return true
}
return s.timedSampler.Sample(lvl)
}
Loading