Skip to content

Commit

Permalink
make stepsRangeInDBAsStr method of at (#14030)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Mar 2, 2025
1 parent afb69b5 commit fd189b6
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 51 deletions.
18 changes: 10 additions & 8 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func (ac *AggregatorRoTx) PruneSmallBatches(ctx context.Context, timeout time.Du
"until commit", time.Until(started.Add(timeout)).String(),
"pruneLimit", pruneLimit,
"aggregatedStep", ac.StepsInFiles(kv.StateDomains...),
"stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx),
"stepsRangeInDB", ac.stepsRangeInDBAsStr(tx),
"pruned", fullStat.String(),
)
case <-ctx.Done():
Expand All @@ -924,13 +924,15 @@ func (ac *AggregatorRoTx) PruneSmallBatches(ctx context.Context, timeout time.Du
}
}

func (a *Aggregator) StepsRangeInDBAsStr(tx kv.Tx) string {
steps := make([]string, 0, kv.DomainLen+4)
for _, d := range a.d {
steps = append(steps, d.stepsRangeInDBAsStr(tx))
func (at *AggregatorRoTx) stepsRangeInDBAsStr(tx kv.Tx) string {
steps := make([]string, 0, len(at.d)+len(at.iis))
for _, dt := range at.d {
a1, a2 := dt.stepsRangeInDB(tx)
steps = append(steps, fmt.Sprintf("%s:%.1f", dt.d.filenameBase, a2-a1))
}
for _, ii := range a.iis {
steps = append(steps, ii.stepsRangeInDBAsStr(tx))
for _, iit := range at.iis {
a1, a2 := iit.stepsRangeInDB(tx)
steps = append(steps, fmt.Sprintf("%s:%.1f", iit.ii.filenameBase, a2-a1))
}
return strings.Join(steps, ", ")
}
Expand Down Expand Up @@ -1065,7 +1067,7 @@ func (ac *AggregatorRoTx) Prune(ctx context.Context, tx kv.RwTx, limit uint64, l
}
//ac.a.logger.Info("aggregator prune", "step", step,
// "txn_range", fmt.Sprintf("[%d,%d)", txFrom, txTo), "limit", limit,
// /*"stepsLimit", limit/ac.a.aggregationStep,*/ "stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx))
// /*"stepsLimit", limit/ac.a.aggregationStep,*/ "stepsRangeInDB", ac.a.stepsRangeInDBAsStr(tx))
aggStat := newAggregatorPruneStat()
for id, d := range ac.d {
var err error
Expand Down
10 changes: 4 additions & 6 deletions erigon-lib/state/aggregator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ import (
"flag"
"fmt"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/log/v3"

"github.com/stretchr/testify/require"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/common/length"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/recsplit"
"github.com/erigontech/erigon-lib/seg"
)
Expand Down Expand Up @@ -142,7 +140,7 @@ func Benchmark_BtreeIndex_Search(b *testing.B) {
defer os.RemoveAll(tmp)
dataPath := "../../data/storage.256-288.kv"

indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bti")
comp := seg.CompressKeys | seg.CompressVals
buildBtreeIndex(b, dataPath, indexPath, comp, 1, logger, true)

Expand Down Expand Up @@ -174,7 +172,7 @@ func benchInitBtreeIndex(b *testing.B, M uint64, compression seg.FileCompression
b.Cleanup(func() { os.RemoveAll(tmp) })

dataPath := generateKV(b, tmp, 52, 10, 1000000, logger, 0)
indexPath := path.Join(tmp, filepath.Base(dataPath)+".bt")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bt")

buildBtreeIndex(b, dataPath, indexPath, compression, 1, logger, true)

Expand Down
7 changes: 3 additions & 4 deletions erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/erigontech/erigon-lib/types/accounts"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"strings"
"sync/atomic"
Expand All @@ -49,6 +47,7 @@ import (
"github.com/erigontech/erigon-lib/kv/stream"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/seg"
"github.com/erigontech/erigon-lib/types/accounts"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1069,7 +1068,7 @@ func generateKV(tb testing.TB, tmp string, keySize, valueSize, keyCount int, log
rnd := newRnd(0)
values := make([]byte, valueSize)

dataPath := path.Join(tmp, fmt.Sprintf("%dk.kv", keyCount/1000))
dataPath := filepath.Join(tmp, fmt.Sprintf("%dk.kv", keyCount/1000))
comp, err := seg.NewCompressor(context.Background(), "cmp", dataPath, tmp, seg.DefaultCfg, log.LvlDebug, logger)
require.NoError(tb, err)

Expand Down Expand Up @@ -1118,7 +1117,7 @@ func generateKV(tb testing.TB, tmp string, keySize, valueSize, keyCount int, log
compPath := decomp.FilePath()
ps := background.NewProgressSet()

IndexFile := path.Join(tmp, fmt.Sprintf("%dk.bt", keyCount/1000))
IndexFile := filepath.Join(tmp, fmt.Sprintf("%dk.bt", keyCount/1000))
err = BuildBtreeIndexWithDecompressor(IndexFile, decomp, compressFlags, ps, tb.TempDir(), 777, logger, true)
require.NoError(tb, err)

Expand Down
9 changes: 4 additions & 5 deletions erigon-lib/state/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package state
import (
"bytes"
"context"
"path"
"path/filepath"
"sort"
"testing"
Expand Down Expand Up @@ -87,7 +86,7 @@ func TestArchiveWriter(t *testing.T) {
writeLatest(t, w, td)
w.Close()

decomp, err := seg.NewDecompressor(path.Join(tmp, "uncompressed"))
decomp, err := seg.NewDecompressor(filepath.Join(tmp, "uncompressed"))
require.NoError(t, err)
defer decomp.Close()

Expand All @@ -102,7 +101,7 @@ func TestArchiveWriter(t *testing.T) {
writeLatest(t, w, td)
w.Close()

decomp, err := seg.NewDecompressor(path.Join(tmp, "compressed"))
decomp, err := seg.NewDecompressor(filepath.Join(tmp, "compressed"))
require.NoError(t, err)
defer decomp.Close()
ds := (datasize.B * datasize.ByteSize(decomp.Size())).HR()
Expand All @@ -117,7 +116,7 @@ func TestArchiveWriter(t *testing.T) {
writeLatest(t, w, td)
w.Close()

decomp, err := seg.NewDecompressor(path.Join(tmp, "compressed-keys"))
decomp, err := seg.NewDecompressor(filepath.Join(tmp, "compressed-keys"))
require.NoError(t, err)
defer decomp.Close()
ds := (datasize.B * datasize.ByteSize(decomp.Size())).HR()
Expand All @@ -132,7 +131,7 @@ func TestArchiveWriter(t *testing.T) {
writeLatest(t, w, td)
w.Close()

decomp, err := seg.NewDecompressor(path.Join(tmp, "compressed-vals"))
decomp, err := seg.NewDecompressor(filepath.Join(tmp, "compressed-vals"))
require.NoError(t, err)
defer decomp.Close()
ds := (datasize.B * datasize.ByteSize(decomp.Size())).HR()
Expand Down
3 changes: 1 addition & 2 deletions erigon-lib/state/bpstree_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package state

import (
"path"
"path/filepath"
"testing"

Expand All @@ -20,7 +19,7 @@ func BenchmarkBpsTreeSeek(t *testing.B) {

dataPath := generateKV(t, tmp, 52, 180, keyCount, logger, 0)

indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bti")
buildBtreeIndex(t, dataPath, indexPath, compressFlags, 1, logger, true)

kv, bt, err := OpenBtreeIndexAndDataFile(indexPath, dataPath, uint64(M), compressFlags, false)
Expand Down
9 changes: 4 additions & 5 deletions erigon-lib/state/btree_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package state
import (
"bytes"
"fmt"
"path"
"path/filepath"
"testing"

Expand Down Expand Up @@ -83,7 +82,7 @@ func Test_BtreeIndex_Seek(t *testing.T) {

t.Run("empty index", func(t *testing.T) {
dataPath := generateKV(t, tmp, 52, 180, 0, logger, 0)
indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bti")
buildBtreeIndex(t, dataPath, indexPath, compressFlags, 1, logger, true)

kv, bt, err := OpenBtreeIndexAndDataFile(indexPath, dataPath, uint64(M), compressFlags, false)
Expand All @@ -94,7 +93,7 @@ func Test_BtreeIndex_Seek(t *testing.T) {
})
dataPath := generateKV(t, tmp, 52, 180, keyCount, logger, 0)

indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bti")
buildBtreeIndex(t, dataPath, indexPath, compressFlags, 1, logger, true)

kv, bt, err := OpenBtreeIndexAndDataFile(indexPath, dataPath, uint64(M), compressFlags, false)
Expand Down Expand Up @@ -169,7 +168,7 @@ func Test_BtreeIndex_Build(t *testing.T) {
keys, err := pivotKeysFromKV(dataPath)
require.NoError(t, err)

indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bti")
buildBtreeIndex(t, dataPath, indexPath, compressFlags, 1, logger, true)
require.NoError(t, err)

Expand Down Expand Up @@ -222,7 +221,7 @@ func Test_BtreeIndex_Seek2(t *testing.T) {
compressFlags := seg.CompressKeys | seg.CompressVals
dataPath := generateKV(t, tmp, 52, 48, keyCount, logger, compressFlags)

indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
indexPath := filepath.Join(tmp, filepath.Base(dataPath)+".bti")
buildBtreeIndex(t, dataPath, indexPath, compressFlags, 1, logger, true)

kv, bt, err := OpenBtreeIndexAndDataFile(indexPath, dataPath, uint64(M), compressFlags, false)
Expand Down
9 changes: 2 additions & 7 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,13 +2017,8 @@ func (sr *SegStreamReader) Next() (k, v []byte, err error) {
return k, v, nil
}

func (d *Domain) stepsRangeInDBAsStr(tx kv.Tx) string {
a1, a2 := d.History.InvertedIndex.stepsRangeInDB(tx)
//ad1, ad2 := d.stepsRangeInDB(tx)
//if ad2-ad1 < 0 {
// fmt.Printf("aaa: %f, %f\n", ad1, ad2)
//}
return fmt.Sprintf("%s:%.1f", d.filenameBase, a2-a1)
func (dt *DomainRoTx) stepsRangeInDB(tx kv.Tx) (from, to float64) {
return dt.ht.iit.stepsRangeInDB(tx)
}

func (dt *DomainRoTx) Files() (res []string) {
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (sd *SharedDomains) Unwind(ctx context.Context, rwTx kv.RwTx, blockUnwindTo
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
sd.logger.Info("aggregator unwind", "step", step,
"txUnwindTo", txUnwindTo, "stepsRangeInDB", sd.aggTx.a.StepsRangeInDBAsStr(rwTx))
//fmt.Printf("aggregator unwind step %d txUnwindTo %d stepsRangeInDB %s\n", step, txUnwindTo, sd.aggTx.a.StepsRangeInDBAsStr(rwTx))
"txUnwindTo", txUnwindTo)
//fmt.Printf("aggregator unwind step %d txUnwindTo %d\n", step, txUnwindTo)
sf := time.Now()
defer mxUnwindSharedTook.ObserveDuration(sf)

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/state/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func TestDomain_PruneOnWrite(t *testing.T) {
require.EqualValues(t, v[:], storedV, label)
}

from, to := d.stepsRangeInDB(tx)
from, to := dc.stepsRangeInDB(tx)
require.Equal(t, 3, int(from))
require.Equal(t, 4, int(to))

Expand Down
14 changes: 5 additions & 9 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,18 +1222,14 @@ func (ii *InvertedIndex) integrateDirtyFiles(sf InvertedFiles, txNumFrom, txNumT
ii.dirtyFiles.Set(fi)
}

func (ii *InvertedIndex) stepsRangeInDBAsStr(tx kv.Tx) string {
a1, a2 := ii.stepsRangeInDB(tx)
return fmt.Sprintf("%s: %.1f", ii.filenameBase, a2-a1)
}
func (ii *InvertedIndex) stepsRangeInDB(tx kv.Tx) (from, to float64) {
fst, _ := kv.FirstKey(tx, ii.keysTable)
func (iit *InvertedIndexRoTx) stepsRangeInDB(tx kv.Tx) (from, to float64) {
fst, _ := kv.FirstKey(tx, iit.ii.keysTable)
if len(fst) > 0 {
from = float64(binary.BigEndian.Uint64(fst)) / float64(ii.aggregationStep)
from = float64(binary.BigEndian.Uint64(fst)) / float64(iit.ii.aggregationStep)
}
lst, _ := kv.LastKey(tx, ii.keysTable)
lst, _ := kv.LastKey(tx, iit.ii.keysTable)
if len(lst) > 0 {
to = float64(binary.BigEndian.Uint64(lst)) / float64(ii.aggregationStep)
to = float64(binary.BigEndian.Uint64(lst)) / float64(iit.ii.aggregationStep)
}
if to == 0 {
to = from
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/state/inverted_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestInvIndexAfterPrune(t *testing.T) {

ic.Close()
err = db.Update(ctx, func(tx kv.RwTx) error {
from, to := ii.stepsRangeInDB(tx)
from, to := ic.stepsRangeInDB(tx)
require.Equal(t, "0.1", fmt.Sprintf("%.1f", from))
require.Equal(t, "0.4", fmt.Sprintf("%.1f", to))

Expand Down Expand Up @@ -315,7 +315,7 @@ func TestInvIndexAfterPrune(t *testing.T) {
require.Nil(t, k, table)
}

from, to := ii.stepsRangeInDB(tx)
from, to := ic.stepsRangeInDB(tx)
require.Equal(t, float64(0), from)
require.Equal(t, float64(0), to)
}
Expand Down

0 comments on commit fd189b6

Please sign in to comment.