Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Tweaks for local go bench
Browse files Browse the repository at this point in the history
Allow loading the whole test data set into memory which can be useful when
benchmarking memory allocations.
Don't block on sending ingest requests while creating batches which improves
data load speed.
  • Loading branch information
niksajakovljevic committed Jun 10, 2022
1 parent dd8f633 commit e5e02e3
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/metric_ingest_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var prometheusDataGzip = "../testdata/prometheus-data.tar.gz"
func TestPromLoader(t *testing.T) {
data, err := extractPrometheusData(prometheusDataGzip, t.TempDir())
require.NoError(t, err, "failed to extract prometheus data")
loader, err := testsupport.NewPromLoader(data)
loader, err := testsupport.NewPromLoader(data, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -48,7 +48,7 @@ func BenchmarkMetricIngest(b *testing.B) {
if err != nil {
b.Fatalf("failed to extract prometheus data: %v", err)
}
loader, err := testsupport.NewPromLoader(data)
loader, err := testsupport.NewPromLoader(data, true) // load whole dataset in memory so we can better track allocations during ingest
require.NoError(b, err)
defer func() {
if err := loader.Close(); err != nil {
Expand Down
85 changes: 64 additions & 21 deletions pkg/tests/testsupport/metric_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type PromLoader interface {
}

type promLoader struct {
db *tsdb.DBReadOnly
blocks []tsdb.BlockReader
db *tsdb.DBReadOnly
blocks []tsdb.BlockReader
inMemory bool
}

// PromIterator allows us to iterate over Prometheus data
Expand All @@ -31,6 +32,27 @@ type PromIterator interface {
Get() TimeSeries
}

type inMemoryIterator struct {
data []TimeSeries
curIdx int
}

func (s *inMemoryIterator) Next() bool {
if s.curIdx == len(s.data)-1 {
return false
}
s.curIdx++
return true
}

func (s *inMemoryIterator) Get() TimeSeries {
return s.data[s.curIdx]
}

func (s *inMemoryIterator) append(ts TimeSeries) {
s.data = append(s.data, ts)
}

type TimeSeries struct {
seriesHash uint64
Val prompb.TimeSeries
Expand All @@ -47,7 +69,7 @@ type promIterator struct {
blocks []tsdb.BlockReader
curBlockIdx int
labelsCache map[uint64]labels.Labels
blockSamples []*BlockSample
blockSamples []BlockSample
curSampleIdx int
}

Expand Down Expand Up @@ -81,7 +103,7 @@ func (i *promIterator) Next() bool {
func (it *promIterator) loadBlockSamples() error {
log.Info("msg", "loading blocks", "total samples", it.blocks[it.curBlockIdx].Meta().Stats.NumSamples,
"series", it.blocks[it.curBlockIdx].Meta().Stats.NumSeries)
it.blockSamples = make([]*BlockSample, it.blocks[it.curBlockIdx].Meta().Stats.NumSamples)
it.blockSamples = make([]BlockSample, it.blocks[it.curBlockIdx].Meta().Stats.NumSamples)
it.labelsCache = make(map[uint64]labels.Labels, it.blocks[it.curBlockIdx].Meta().Stats.NumSeries)
querier, err := tsdb.NewBlockQuerier(it.blocks[it.curBlockIdx], math.MinInt64, math.MaxInt64)
if err != nil {
Expand All @@ -103,8 +125,7 @@ func (it *promIterator) loadBlockSamples() error {
}
for seriesIt.Next() {
ts, val := seriesIt.At()
sample := &BlockSample{ts, val, lblsHash}
it.blockSamples[sampleCounter] = sample
it.blockSamples[sampleCounter] = BlockSample{ts, val, lblsHash}
sampleCounter++
}
}
Expand All @@ -130,11 +151,13 @@ func (i *promIterator) Get() TimeSeries {
Labels: protoLabels,
Samples: []prompb.Sample{sample},
}

return TimeSeries{blockSample.lblsHash, ts}
}

func NewPromLoader(dataDir string) (PromLoader, error) {
// PromLoader can preload the whole dataset in memory which can be useful to
// get accurate memory allocations when benchmarking. However it does mean that bench
// test needs more memory to run so make sure that test dataset can fit into memory
func NewPromLoader(dataDir string, inMemory bool) (PromLoader, error) {
db, err := tsdb.OpenDBReadOnly(dataDir, nil)
if err != nil {
return nil, fmt.Errorf("error starting Prometheus TSDB in read-only: %v", err)
Expand All @@ -143,11 +166,20 @@ func NewPromLoader(dataDir string) (PromLoader, error) {
if err != nil {
return nil, fmt.Errorf("error loading data blocks: %v", err)
}
return &promLoader{db: db, blocks: blocks}, nil
return &promLoader{db: db, blocks: blocks, inMemory: inMemory}, nil
}

func (loader *promLoader) Iterator() PromIterator {
return &promIterator{blocks: loader.blocks, curSampleIdx: -1, curBlockIdx: -1}
it := &promIterator{blocks: loader.blocks, curSampleIdx: -1, curBlockIdx: -1}
if loader.inMemory {
store := &inMemoryIterator{data: make([]TimeSeries, 0), curIdx: -1}
for it.Next() {
ts := it.Get()
store.append(ts)
}
return store
}
return it
}

func (loader *promLoader) Close() error {
Expand Down Expand Up @@ -200,29 +232,40 @@ func (si *sampleIngestor) shardSamples() {
si.shards[shardIdx] <- sample.Val
if si.rate != nil {
if err := si.rate.Wait(context.Background()); err != nil {
log.Error(err)
log.Error("msg", err)
}
}
}
}()
}

func (si *sampleIngestor) ingestSamples(ingest IngestFunc) {
var wg sync.WaitGroup
var shardWg sync.WaitGroup
var ingestWg sync.WaitGroup
reqCh := make(chan prompb.WriteRequest, 100)
for i := 0; i < len(si.shards); i++ {
wg.Add(1)
ingestWg.Add(1)
go func() {
defer func() {
ingestWg.Done()
}()
for req := range reqCh {
if _, _, err := ingest(context.Background(), &req); err != nil {
log.Error("msg", err)
}
}
}()
shardWg.Add(1)
go func(shard int) {
defer func() {
wg.Done()
shardWg.Done()
}()
var req prompb.WriteRequest = prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, si.batchSize)}
counter := 0
for ts := range si.shards[shard] {
if counter == si.batchSize {
if _, _, err := ingest(context.Background(), &req); err != nil {
log.Error(err)
}
req = prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, si.batchSize)}
reqCh <- req
counter = 0
} else {
req.Timeseries[counter] = ts
Expand All @@ -231,11 +274,11 @@ func (si *sampleIngestor) ingestSamples(ingest IngestFunc) {
}
if len(req.Timeseries) > 0 {
// flush leftovers
if _, _, err := ingest(context.Background(), &req); err != nil {
log.Error(err)
}
reqCh <- req
}
}(i)
}
wg.Wait()
shardWg.Wait()
close(reqCh)
ingestWg.Wait()
}

0 comments on commit e5e02e3

Please sign in to comment.