Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Ingest metric data with COPY to get better performance
Browse files Browse the repository at this point in the history
We try COPY into metric table. If there is a conflict situation
(eg. duplicate key violation) we catch it on client and retry
by COPYing to temporary table and doing ON CONFLICT insert from there.
  • Loading branch information
niksajakovljevic committed Jul 6, 2022
1 parent cb72068 commit 7fe0b84
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 150 deletions.
2 changes: 1 addition & 1 deletion EXTENSION_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.2
niksa-ingest-with-copy-ts2.7-pg14
6 changes: 6 additions & 0 deletions pkg/pgmodel/common/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,10 @@ const (
LockID = 0x4D829C732AAFCEDE // Chosen randomly.

PromDataSeries = "prom_data_series"
PsTrace = "_ps_trace"
)

var (
PromDataColumns = [3]string{"time", "value", "series_id"}
PromExemplarColumns = [4]string{"time", "series_id", "exemplar_label_values", "value"}
)
171 changes: 105 additions & 66 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@ import (
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/tracer"
)

const (
sqlInsertIntoFrom = "INSERT INTO %[1]s.%[2]s(%[3]s) SELECT %[3]s FROM %[4]s ON CONFLICT DO NOTHING"
)

type copyRequest struct {
data *pendingBuffer
info *pgmodel.MetricInfo
Expand Down Expand Up @@ -197,10 +203,19 @@ hot_gather:
func doInsertOrFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) {
ctx, span := tracer.Default().Start(ctx, "do-insert-or-fallback")
defer span.End()
err, _ := insertSeries(ctx, conn, reqs...)
err, _ := insertSeries(ctx, conn, false, reqs...)
if err != nil {
insertBatchErrorFallback(ctx, conn, reqs...)
return
pgErr, ok := err.(*pgconn.PgError)
if ok && pgErr.Code == "23505" {
// unique violation
// we retry with onConflict
err, _ = insertSeries(ctx, conn, true, reqs...)
}
if err != nil {
log.Error("msg", err)
insertBatchErrorFallback(ctx, conn, reqs...)
return
}
}

for i := range reqs {
Expand All @@ -213,7 +228,7 @@ func insertBatchErrorFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ..
ctx, span := tracer.Default().Start(ctx, "insert-batch-error-fallback")
defer span.End()
for i := range reqs {
err, minTime := insertSeries(ctx, conn, reqs[i])
err, minTime := insertSeries(ctx, conn, false, reqs[i])
if err != nil {
err = tryRecovery(ctx, conn, err, reqs[i], minTime)
}
Expand Down Expand Up @@ -287,7 +302,7 @@ func retryAfterDecompression(ctx context.Context, conn pgxconn.PgxConn, req copy

metrics.IngestorDecompressCalls.With(prometheus.Labels{"type": "metric", "kind": "sample"}).Inc()
metrics.IngestorDecompressEarliest.With(prometheus.Labels{"type": "metric", "kind": "sample", "table": table}).Set(float64(minTime.UnixNano()) / 1e9)
err, _ := insertSeries(ctx, conn, req) // Attempt an insert again.
err, _ := insertSeries(ctx, conn, false, req) // Attempt an insert again.
return err
}

Expand All @@ -310,17 +325,31 @@ func debugInsert() {
var labelsCopier = prometheus.Labels{"type": "metric", "subsystem": "copier"}

// insertSeries performs the insertion of time-series into the DB.
func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) (error, int64) {
func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, reqs ...copyRequest) (error, int64) {
_, span := tracer.Default().Start(ctx, "insert-series")
defer span.End()
batch := conn.NewBatch()

numRowsPerInsert := make([]int, 0, len(reqs))
insertedRows := make([]int, 0, len(reqs))
numRowsTotal := 0
totalSamples := 0
totalExemplars := 0
var sampleRows [][]interface{}
var exemplarRows [][]interface{}
insertStart := time.Now()
lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64)
lowestMinTime := int64(math.MaxInt64)
tx, err := conn.BeginTx(ctx)
if err != nil {
return fmt.Errorf("failed to start TX for inserting metrics: %v", err), lowestMinTime
}
defer func() {
if tx != nil {
if err := tx.Rollback(ctx); err != nil {
log.Error(err)
}
}
}()

for r := range reqs {
req := &reqs[r]
// Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry)
Expand All @@ -345,45 +374,24 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest
var (
hasSamples bool
hasExemplars bool

timeSamples []time.Time
timeExemplars []time.Time

valSamples []float64
valExemplars []float64

seriesIdSamples []int64
seriesIdExemplars []int64

exemplarLbls [][]string
)

if numSamples > 0 {
timeSamples = make([]time.Time, 0, numSamples)
valSamples = make([]float64, 0, numSamples)
seriesIdSamples = make([]int64, 0, numSamples)
sampleRows = make([][]interface{}, 0, numSamples)
}
if numExemplars > 0 {
timeExemplars = make([]time.Time, 0, numExemplars)
valExemplars = make([]float64, 0, numExemplars)
seriesIdExemplars = make([]int64, 0, numExemplars)
exemplarLbls = make([][]string, 0, numExemplars)
exemplarRows = make([][]interface{}, 0, numExemplars)
}

visitor := req.data.batch.Visitor()
err := visitor.Visit(
err = visitor.Visit(
func(t time.Time, v float64, seriesId int64) {
hasSamples = true
timeSamples = append(timeSamples, t)
valSamples = append(valSamples, v)
seriesIdSamples = append(seriesIdSamples, seriesId)
sampleRows = append(sampleRows, []interface{}{t, v, seriesId})
},
func(t time.Time, v float64, seriesId int64, lvalues []string) {
hasExemplars = true
timeExemplars = append(timeExemplars, t)
valExemplars = append(valExemplars, v)
seriesIdExemplars = append(seriesIdExemplars, seriesId)
exemplarLbls = append(exemplarLbls, lvalues)
exemplarRows = append(exemplarRows, []interface{}{t, seriesId, lvalues, v})
},
)
if err != nil {
Expand All @@ -403,63 +411,94 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest
totalExemplars += numExemplars
if hasSamples {
numRowsPerInsert = append(numRowsPerInsert, numSamples)
batch.Queue("SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", req.info.TableName, timeSamples, valSamples, seriesIdSamples)
table := pgx.Identifier{req.info.TableSchema, req.info.TableName}
if onConflict {
table, err = createTempIngestTable(ctx, tx, req.info.TableName, req.info.TableSchema)
if err != nil {
return err, lowestMinTime
}
}
inserted, err := tx.CopyFrom(ctx, table, schema.PromDataColumns[:], pgx.CopyFromRows(sampleRows))
if err != nil {
return err, lowestMinTime
}
if onConflict {
res, err := tx.Exec(ctx, fmt.Sprintf(sqlInsertIntoFrom, schema.PromData, pgx.Identifier{req.info.TableName}.Sanitize(),
strings.Join(schema.PromDataColumns[:], ","), table.Sanitize()))
if err != nil {
return err, lowestMinTime
}
inserted = res.RowsAffected()

}
insertedRows = append(insertedRows, int(inserted))
}
if hasExemplars {
// We cannot send 2-D [][]TEXT to postgres via the pgx.encoder. For this and easier querying reasons, we create a
// new type in postgres by the name prom_api.label_value_array and use that type as array (which forms a 2D array of TEXT)
// which is then used to push using the unnest method apprach.
labelValues := pgmodel.GetCustomType(pgmodel.LabelValueArray)
if err := labelValues.Set(exemplarLbls); err != nil {
return fmt.Errorf("setting prom_api.label_value_array[] value: %w", err), lowestMinTime
}
numRowsPerInsert = append(numRowsPerInsert, numExemplars)
batch.Queue("SELECT _prom_catalog.insert_exemplar_row($1::NAME, $2::TIMESTAMPTZ[], $3::BIGINT[], $4::prom_api.label_value_array[], $5::DOUBLE PRECISION[])", req.info.TableName, timeExemplars, seriesIdExemplars, labelValues, valExemplars)
table := pgx.Identifier{schema.PromDataExemplar, req.info.TableName}
if onConflict {
table, err = createTempIngestTable(ctx, tx, req.info.TableName, schema.PromDataExemplar)
if err != nil {
return err, lowestMinTime
}
}
inserted, err := tx.CopyFrom(ctx, table, schema.PromExemplarColumns[:], pgx.CopyFromRows(exemplarRows))
if err != nil {
return err, lowestMinTime
}
if onConflict {
res, err := tx.Exec(ctx, fmt.Sprintf(sqlInsertIntoFrom, schema.PromDataExemplar, pgx.Identifier{req.info.TableName}.Sanitize(),
strings.Join(schema.PromExemplarColumns[:], ","), table.Sanitize()))
if err != nil {
return err, lowestMinTime
}
inserted = res.RowsAffected()

}
insertedRows = append(insertedRows, int(inserted))
}
}

//note the epoch increment takes an access exclusive on the table before incrementing.
//thus we don't need row locking here. Note by doing this check at the end we can
//have some wasted work for the inserts before this fails but this is rare.
//avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it.
batch.Queue("SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch))
row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch))
var val []byte
if err = row.Scan(&val); err != nil {
return err, lowestMinTime
}

metrics.IngestorRowsPerBatch.With(labelsCopier).Observe(float64(numRowsTotal))
metrics.IngestorInsertsPerBatch.With(labelsCopier).Observe(float64(len(reqs)))
start := time.Now()
results, err := conn.SendBatch(context.Background(), batch)
if err != nil {
if err = tx.Commit(ctx); err != nil {
return err, lowestMinTime
}
defer results.Close()
metrics.IngestorRowsPerBatch.With(labelsCopier).Observe(float64(numRowsTotal))
metrics.IngestorInsertsPerBatch.With(labelsCopier).Observe(float64(len(reqs)))

var affectedMetrics uint64
for _, numRows := range numRowsPerInsert {
var insertedRows int64
err := results.QueryRow().Scan(&insertedRows)
if err != nil {
return err, lowestMinTime
}
numRowsExpected := int64(numRows)
if numRowsExpected != insertedRows {
for idx, numRows := range numRowsPerInsert {
if numRows != insertedRows[idx] {
affectedMetrics++
registerDuplicates(numRowsExpected - insertedRows)
registerDuplicates(int64(numRows - insertedRows[idx]))
}
}
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Add(float64(totalSamples))
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars))

var val []byte
row := results.QueryRow()
err = row.Scan(&val)
if err != nil {
return err, lowestMinTime
}
reportDuplicates(affectedMetrics)
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(start).Seconds())
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds())
return nil, lowestMinTime
}

func createTempIngestTable(ctx context.Context, tx pgx.Tx, table, schema string) (pgx.Identifier, error) {
var tempTableNameRawString string
row := tx.QueryRow(ctx, "SELECT _prom_catalog.create_ingest_temp_table($1, $2)", table, schema)
if err := row.Scan(&tempTableNameRawString); err != nil {
return nil, err
}
return pgx.Identifier{tempTableNameRawString}, nil
}

func insertMetadata(conn pgxconn.PgxConn, reqs []pgmodel.Metadata) (insertedRows uint64, err error) {
numRows := len(reqs)
timeSlice := make([]time.Time, numRows)
Expand Down
Loading

0 comments on commit 7fe0b84

Please sign in to comment.