Skip to content

Commit

Permalink
Merge pull request #66 from coinbase/patrick/inactive-reconciler-impr…
Browse files Browse the repository at this point in the history
…ovements

Inactive Reconciler Performance when --lookup-balance-by-block=false
  • Loading branch information
patrick-ogrady authored Jul 15, 2020
2 parents 6fcf086 + 745b911 commit 5b24300
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 66 deletions.
19 changes: 18 additions & 1 deletion reconciler/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/coinbase/rosetta-sdk-go/parser"
"github.com/coinbase/rosetta-sdk-go/types"
)

// Option is used to overwrite default values in
Expand Down Expand Up @@ -59,7 +60,7 @@ func WithSeenAccounts(seen []*AccountCurrency) Option {
r.inactiveQueue = append(r.inactiveQueue, &InactiveEntry{
Entry: acct,
})
r.seenAccounts = append(r.seenAccounts, acct)
r.seenAccounts[types.Hash(acct)] = struct{}{}
}

fmt.Printf(
Expand All @@ -85,3 +86,19 @@ func WithLookupBalanceByBlock(lookup bool) Option {
r.lookupBalanceByBlock = lookup
}
}

// WithInactiveFrequency is how many blocks the reconciler
// should wait between inactive reconciliations on each account.
func WithInactiveFrequency(blocks int64) Option {
return func(r *Reconciler) {
r.inactiveFrequency = blocks
}
}

// WithDebugLogging determines if verbose logs should
// be printed.
func WithDebugLogging(debug bool) Option {
return func(r *Reconciler) {
r.debugLogging = debug
}
}
171 changes: 116 additions & 55 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
// that can be enqueued to reconcile before new
// requests are dropped.
// TODO: Make configurable
backlogThreshold = 1000
backlogThreshold = 50000

// waitToCheckDiff is the syncing difference (live-head)
// to retry instead of exiting. In other words, if the
Expand All @@ -62,13 +62,12 @@ const (

// inactiveReconciliationSleep is used as the time.Duration
// to sleep when there are no seen accounts to reconcile.
inactiveReconciliationSleep = 5 * time.Second
inactiveReconciliationSleep = 1 * time.Second

// inactiveReconciliationRequiredDepth is the minimum
// defaultInactiveFrequency is the minimum
// number of blocks the reconciler should wait between
// inactive reconciliations.
// TODO: make configurable
inactiveReconciliationRequiredDepth = 500
// inactive reconciliations for each account.
defaultInactiveFrequency = 200

// defaultLookupBalanceByBlock is the default setting
// for how to perform balance queries. It is preferable
Expand Down Expand Up @@ -175,6 +174,8 @@ type Reconciler struct {
lookupBalanceByBlock bool
interestingAccounts []*AccountCurrency
changeQueue chan *parser.BalanceChange
inactiveFrequency int64
debugLogging bool

// Reconciler concurrency is separated between
// active and inactive concurrency to allow for
Expand All @@ -198,7 +199,7 @@ type Reconciler struct {
// queue. If this is not done, it is possible a goroutine
// could be processing an account (not in the queue) when
// we do a lookup to determine if we should add to the queue.
seenAccounts []*AccountCurrency
seenAccounts map[string]struct{}
inactiveQueue []*InactiveEntry

// inactiveQueueMutex needed because we can't peek at the tip
Expand All @@ -219,10 +220,11 @@ func New(
helper: helper,
handler: handler,
fetcher: fetcher,
inactiveFrequency: defaultInactiveFrequency,
activeConcurrency: defaultReconcilerConcurrency,
inactiveConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: []*AccountCurrency{},
seenAccounts: map[string]struct{}{},
inactiveQueue: []*InactiveEntry{},

// When lookupBalanceByBlock is enabled, we check
Expand Down Expand Up @@ -269,23 +271,37 @@ func (r *Reconciler) QueueChanges(
})
}

if !r.lookupBalanceByBlock {
// All changes will have the same block. Return
// if we are too far behind to start reconciling.
if block.Index < r.highWaterMark {
return nil
for _, change := range balanceChanges {
// Add all seen accounts to inactive reconciler queue.
//
// Note: accounts are only added if they have not been seen before.
err := r.inactiveAccountQueue(false, &AccountCurrency{
Account: change.Account,
Currency: change.Currency,
}, block)
if err != nil {
return err
}

for _, change := range balanceChanges {
if !r.lookupBalanceByBlock {
// All changes will have the same block. Continue
// if we are too far behind to start reconciling.
//
// Note: we don't return here so that we can ensure
// all seen accounts are added to the inactiveAccountQueue.
if block.Index < r.highWaterMark {
continue
}

select {
case r.changeQueue <- change:
default:
log.Println("skipping active enqueue because backlog")
if r.debugLogging {
log.Println("skipping active enqueue because backlog")
}
}
}
} else {
// Block until all checked for a block or context is Done
for _, change := range balanceChanges {
} else {
// Block until all checked for a block or context is Done
select {
case r.changeQueue <- change:
case <-ctx.Done():
Expand Down Expand Up @@ -443,11 +459,13 @@ func (r *Reconciler) accountReconciliation(
}

// Don't wait to check if we are very far behind
log.Printf(
"Skipping reconciliation for %s: %d blocks behind\n",
types.PrettyPrintStruct(accountCurrency),
diff,
)
if r.debugLogging {
log.Printf(
"Skipping reconciliation for %s: %d blocks behind\n",
types.PrettyPrintStruct(accountCurrency),
diff,
)
}

// Set a highWaterMark to not accept any new
// reconciliation requests unless they happened
Expand Down Expand Up @@ -486,18 +504,13 @@ func (r *Reconciler) accountReconciliation(
liveAmount,
liveBlock,
)
if err != nil {
if err != nil { // error only returned if we should exit on failure
return err
}

return nil
}

err = r.inactiveAccountQueue(inactive, accountCurrency, liveBlock)
if err != nil {
return err
}

return r.handler.ReconciliationSucceeded(
ctx,
reconciliationType,
Expand All @@ -508,6 +521,7 @@ func (r *Reconciler) accountReconciliation(
)
}

// We return here if we gave up trying to reconcile an account.
return nil
}

Expand All @@ -516,22 +530,24 @@ func (r *Reconciler) inactiveAccountQueue(
accountCurrency *AccountCurrency,
liveBlock *types.BlockIdentifier,
) error {
r.inactiveQueueMutex.Lock()

// Only enqueue the first time we see an account on an active reconciliation.
shouldEnqueueInactive := false
if !inactive && !ContainsAccountCurrency(r.seenAccounts, accountCurrency) {
r.seenAccounts = append(r.seenAccounts, accountCurrency)
r.seenAccounts[types.Hash(accountCurrency)] = struct{}{}
shouldEnqueueInactive = true
}

if inactive || shouldEnqueueInactive {
r.inactiveQueueMutex.Lock()
r.inactiveQueue = append(r.inactiveQueue, &InactiveEntry{
Entry: accountCurrency,
LastCheck: liveBlock,
})
r.inactiveQueueMutex.Unlock()
}

r.inactiveQueueMutex.Unlock()

return nil
}

Expand Down Expand Up @@ -577,6 +593,35 @@ func (r *Reconciler) reconcileActiveAccounts(
}
}

// shouldAttemptInactiveReconciliation returns a boolean indicating whether
// inactive reconciliation should be attempted based on syncing status.
func (r *Reconciler) shouldAttemptInactiveReconciliation(
ctx context.Context,
) (bool, *types.BlockIdentifier) {
head, err := r.helper.CurrentBlock(ctx)
// When first start syncing, this loop may run before the genesis block is synced.
// If this is the case, we should sleep and try again later instead of exiting.
if err != nil {
if r.debugLogging {
log.Println("waiting to start intactive reconciliation until a block is synced...")
}

return false, nil
}

if head.Index < r.highWaterMark {
if r.debugLogging {
log.Println(
"waiting to continue intactive reconciliation until reaching high water mark...",
)
}

return false, nil
}

return true, head
}

// reconcileInactiveAccounts selects a random account
// from all previously seen accounts and reconciles
// the balance. This is useful for detecting balance
Expand All @@ -589,27 +634,25 @@ func (r *Reconciler) reconcileInactiveAccounts(
return ctx.Err()
}

head, err := r.helper.CurrentBlock(ctx)
// When first start syncing, this loop may run before the genesis block is synced.
// If this is the case, we should sleep and try again later instead of exiting.
if err != nil {
log.Println("waiting to start intactive reconciliation until a block is synced...")
shouldAttempt, head := r.shouldAttemptInactiveReconciliation(ctx)
if !shouldAttempt {
time.Sleep(inactiveReconciliationSleep)
continue
}

r.inactiveQueueMutex.Lock()
nextValidIndex := r.inactiveQueue[0].LastCheck.Index + r.inactiveFrequency
if len(r.inactiveQueue) > 0 &&
(r.inactiveQueue[0].LastCheck == nil || // block is set to nil when loaded from previous run
r.inactiveQueue[0].LastCheck.Index+inactiveReconciliationRequiredDepth < head.Index) {
randAcct := r.inactiveQueue[0]
nextValidIndex <= head.Index) {
nextAcct := r.inactiveQueue[0]
r.inactiveQueue = r.inactiveQueue[1:]
r.inactiveQueueMutex.Unlock()

block, amount, err := r.bestBalance(
ctx,
randAcct.Entry.Account,
randAcct.Entry.Currency,
nextAcct.Entry.Account,
nextAcct.Entry.Currency,
types.ConstructPartialBlockIdentifier(head),
)
if err != nil {
Expand All @@ -618,17 +661,32 @@ func (r *Reconciler) reconcileInactiveAccounts(

err = r.accountReconciliation(
ctx,
randAcct.Entry.Account,
randAcct.Entry.Currency,
nextAcct.Entry.Account,
nextAcct.Entry.Currency,
amount,
block,
true,
)
if err != nil {
return err
}

// Always re-enqueue accounts after they have been inactively
// reconciled. If we don't re-enqueue, we will never check
// these accounts again.
err = r.inactiveAccountQueue(true, nextAcct.Entry, block)
if err != nil {
return err
}
} else {
r.inactiveQueueMutex.Unlock()
if r.debugLogging {
log.Printf(
"no accounts ready for inactive reconciliation (%d accounts in queue, will reconcile next account at index %d)\n",
len(r.inactiveQueue),
nextValidIndex,
)
}
time.Sleep(inactiveReconciliationSleep)
}
}
Expand Down Expand Up @@ -671,22 +729,20 @@ func ExtractAmount(
return b, nil
}

return nil, fmt.Errorf("could not extract amount for %+v", currency)
return nil, fmt.Errorf(
"account balance response does could not contain currency %s",
types.PrettyPrintStruct(currency),
)
}

// ContainsAccountCurrency returns a boolean indicating if a
// AccountCurrency slice already contains an Account and Currency combination.
// AccountCurrency set already contains an Account and Currency combination.
func ContainsAccountCurrency(
arr []*AccountCurrency,
m map[string]struct{},
change *AccountCurrency,
) bool {
for _, a := range arr {
if types.Hash(a) == types.Hash(change) {
return true
}
}

return false
_, exists := m[types.Hash(change)]
return exists
}

// GetCurrencyBalance fetches the balance of a *types.AccountIdentifier
Expand All @@ -711,7 +767,12 @@ func GetCurrencyBalance(

liveAmount, err := ExtractAmount(liveBalances, currency)
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf(
"%w: could not get %s currency balance for %s",
err,
types.PrettyPrintStruct(currency),
types.PrettyPrintStruct(account),
)
}

return liveBlock, liveAmount.Value, nil
Expand Down
Loading

0 comments on commit 5b24300

Please sign in to comment.