Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Ingest metric data with COPY to get better performance #1462

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/go-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ jobs:
extension_version=$(cat EXTENSION_VERSION | tr -d '[:space:]')
stable_branch_tag=$(echo ${extension_version}-ts2)
image_base="ghcr.io/timescale/dev_promscale_extension"
docker_image_12=$(./scripts/fallback-docker.sh ${image_base}:${possible_branch_tag}-pg12 ${image_base}:${stable_branch_tag}-pg12)
docker_image_13=$(./scripts/fallback-docker.sh ${image_base}:${possible_branch_tag}-pg13 ${image_base}:${stable_branch_tag}-pg13)
docker_image_14=$(./scripts/fallback-docker.sh ${image_base}:${possible_branch_tag}-pg14 ${image_base}:${stable_branch_tag}-pg14)
docker_image_12=${image_base}:${stable_branch_tag}-pg12
docker_image_13=${image_base}:${stable_branch_tag}-pg13
docker_image_14=${image_base}:${stable_branch_tag}-pg14
fi;
echo "::set-output name=docker_image_12::${docker_image_12}"
echo "::set-output name=docker_image_13::${docker_image_13}"
Expand Down
2 changes: 1 addition & 1 deletion EXTENSION_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.2
niksa-new-arg-for-create-ingest-table
6 changes: 6 additions & 0 deletions pkg/pgmodel/common/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,10 @@ const (
LockID = 0x4D829C732AAFCEDE // Chosen randomly.

PromDataSeries = "prom_data_series"
PsTrace = "_ps_trace"
)

var (
PromDataColumns = []string{"time", "value", "series_id"}
PromExemplarColumns = []string{"time", "series_id", "exemplar_label_values", "value"}
)
183 changes: 117 additions & 66 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@ import (
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/tracer"
)

const (
sqlInsertIntoFrom = "INSERT INTO %[1]s.%[2]s(%[3]s) SELECT %[3]s FROM %[4]s ON CONFLICT DO NOTHING"
)

type copyRequest struct {
data *pendingBuffer
info *pgmodel.MetricInfo
Expand Down Expand Up @@ -197,10 +203,16 @@ hot_gather:
func doInsertOrFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) {
ctx, span := tracer.Default().Start(ctx, "do-insert-or-fallback")
defer span.End()
err, _ := insertSeries(ctx, conn, reqs...)
err, _ := insertSeries(ctx, conn, false, reqs...)
if err != nil {
insertBatchErrorFallback(ctx, conn, reqs...)
return
if isPGUniqueViolation(err) {
err, _ = insertSeries(ctx, conn, true, reqs...)
}
if err != nil {
log.Error("msg", err)
insertBatchErrorFallback(ctx, conn, reqs...)
return
}
}

for i := range reqs {
Expand All @@ -209,11 +221,23 @@ func doInsertOrFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyR
}
}

// check if we got error for duplicates
func isPGUniqueViolation(err error) bool {
if err == nil {
return false
}
pgErr, ok := err.(*pgconn.PgError)
if ok && pgErr.Code == "23505" {
return true
}
return false
}

func insertBatchErrorFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) {
ctx, span := tracer.Default().Start(ctx, "insert-batch-error-fallback")
defer span.End()
for i := range reqs {
err, minTime := insertSeries(ctx, conn, reqs[i])
err, minTime := insertSeries(ctx, conn, true, reqs[i])
if err != nil {
err = tryRecovery(ctx, conn, err, reqs[i], minTime)
}
Expand Down Expand Up @@ -287,7 +311,10 @@ func retryAfterDecompression(ctx context.Context, conn pgxconn.PgxConn, req copy

metrics.IngestorDecompressCalls.With(prometheus.Labels{"type": "metric", "kind": "sample"}).Inc()
metrics.IngestorDecompressEarliest.With(prometheus.Labels{"type": "metric", "kind": "sample", "table": table}).Set(float64(minTime.UnixNano()) / 1e9)
err, _ := insertSeries(ctx, conn, req) // Attempt an insert again.
err, _ := insertSeries(ctx, conn, false, req) // Attempt an insert again.
if isPGUniqueViolation(err) {
err, _ = insertSeries(ctx, conn, true, req) // And again :)
}
return err
}

Expand All @@ -310,17 +337,31 @@ func debugInsert() {
var labelsCopier = prometheus.Labels{"type": "metric", "subsystem": "copier"}

// insertSeries performs the insertion of time-series into the DB.
func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) (error, int64) {
func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, reqs ...copyRequest) (error, int64) {
_, span := tracer.Default().Start(ctx, "insert-series")
defer span.End()
batch := conn.NewBatch()

numRowsPerInsert := make([]int, 0, len(reqs))
insertedRows := make([]int, 0, len(reqs))
numRowsTotal := 0
totalSamples := 0
totalExemplars := 0
var sampleRows [][]interface{}
var exemplarRows [][]interface{}
insertStart := time.Now()
lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64)
lowestMinTime := int64(math.MaxInt64)
tx, err := conn.BeginTx(ctx)
if err != nil {
return fmt.Errorf("failed to start transaction for inserting metrics: %v", err), lowestMinTime
}
defer func() {
if tx != nil {
if err := tx.Rollback(ctx); err != nil {
log.Error(err)
}
}
}()

for r := range reqs {
req := &reqs[r]
// Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry)
Expand All @@ -345,45 +386,24 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest
var (
hasSamples bool
hasExemplars bool

timeSamples []time.Time
timeExemplars []time.Time

valSamples []float64
valExemplars []float64

seriesIdSamples []int64
seriesIdExemplars []int64

exemplarLbls [][]string
)

if numSamples > 0 {
timeSamples = make([]time.Time, 0, numSamples)
valSamples = make([]float64, 0, numSamples)
seriesIdSamples = make([]int64, 0, numSamples)
sampleRows = make([][]interface{}, 0, numSamples)
}
if numExemplars > 0 {
timeExemplars = make([]time.Time, 0, numExemplars)
valExemplars = make([]float64, 0, numExemplars)
seriesIdExemplars = make([]int64, 0, numExemplars)
exemplarLbls = make([][]string, 0, numExemplars)
exemplarRows = make([][]interface{}, 0, numExemplars)
}

visitor := req.data.batch.Visitor()
err := visitor.Visit(
err = visitor.Visit(
func(t time.Time, v float64, seriesId int64) {
hasSamples = true
timeSamples = append(timeSamples, t)
valSamples = append(valSamples, v)
seriesIdSamples = append(seriesIdSamples, seriesId)
sampleRows = append(sampleRows, []interface{}{t, v, seriesId})
},
func(t time.Time, v float64, seriesId int64, lvalues []string) {
hasExemplars = true
timeExemplars = append(timeExemplars, t)
valExemplars = append(valExemplars, v)
seriesIdExemplars = append(seriesIdExemplars, seriesId)
exemplarLbls = append(exemplarLbls, lvalues)
exemplarRows = append(exemplarRows, []interface{}{t, seriesId, lvalues, v})
},
)
if err != nil {
Expand All @@ -401,65 +421,96 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest
numRowsTotal += numSamples + numExemplars
totalSamples += numSamples
totalExemplars += numExemplars

copyFromFunc := func(tableName, schemaName string, isExemplar bool) error {
columns := schema.PromDataColumns
tempTablePrefix := fmt.Sprintf("s%d_", r)
rows := sampleRows
if isExemplar {
columns = schema.PromExemplarColumns
tempTablePrefix = fmt.Sprintf("s%d_", r)
rows = exemplarRows
}
table := pgx.Identifier{schemaName, tableName}
if onConflict {
// we append table prefix to make sure that temp table name is unique
table, err = createTempIngestTable(ctx, tx, tableName, schemaName, tempTablePrefix)
if err != nil {
return err
}
}
inserted, err := tx.CopyFrom(ctx, table, columns, pgx.CopyFromRows(rows))
if err != nil {
return err
}
if onConflict {
res, err := tx.Exec(ctx,
fmt.Sprintf(sqlInsertIntoFrom, schemaName, pgx.Identifier{tableName}.Sanitize(),
strings.Join(columns[:], ","), table.Sanitize()))
if err != nil {
return err
}
inserted = res.RowsAffected()

}
insertedRows = append(insertedRows, int(inserted))
return nil
}

if hasSamples {
numRowsPerInsert = append(numRowsPerInsert, numSamples)
batch.Queue("SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", req.info.TableName, timeSamples, valSamples, seriesIdSamples)
if err = copyFromFunc(req.info.TableName, req.info.TableSchema, false); err != nil {
return err, lowestMinTime
}
}
if hasExemplars {
// We cannot send 2-D [][]TEXT to postgres via the pgx.encoder. For this and easier querying reasons, we create a
// new type in postgres by the name prom_api.label_value_array and use that type as array (which forms a 2D array of TEXT)
// which is then used to push using the unnest method apprach.
labelValues := pgmodel.GetCustomType(pgmodel.LabelValueArray)
if err := labelValues.Set(exemplarLbls); err != nil {
return fmt.Errorf("setting prom_api.label_value_array[] value: %w", err), lowestMinTime
numRowsPerInsert = append(numRowsPerInsert, numSamples)
if err = copyFromFunc(req.info.TableName, schema.PromDataExemplar, true); err != nil {
return err, lowestMinTime
}
numRowsPerInsert = append(numRowsPerInsert, numExemplars)
batch.Queue("SELECT _prom_catalog.insert_exemplar_row($1::NAME, $2::TIMESTAMPTZ[], $3::BIGINT[], $4::prom_api.label_value_array[], $5::DOUBLE PRECISION[])", req.info.TableName, timeExemplars, seriesIdExemplars, labelValues, valExemplars)
}
}

//note the epoch increment takes an access exclusive on the table before incrementing.
//thus we don't need row locking here. Note by doing this check at the end we can
//have some wasted work for the inserts before this fails but this is rare.
//avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it.
batch.Queue("SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch))
row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch))
var val []byte
if err = row.Scan(&val); err != nil {
return err, lowestMinTime
}

metrics.IngestorRowsPerBatch.With(labelsCopier).Observe(float64(numRowsTotal))
metrics.IngestorInsertsPerBatch.With(labelsCopier).Observe(float64(len(reqs)))
start := time.Now()
results, err := conn.SendBatch(context.Background(), batch)
if err != nil {
if err = tx.Commit(ctx); err != nil {
return err, lowestMinTime
}
defer results.Close()
metrics.IngestorRowsPerBatch.With(labelsCopier).Observe(float64(numRowsTotal))
metrics.IngestorInsertsPerBatch.With(labelsCopier).Observe(float64(len(reqs)))

var affectedMetrics uint64
for _, numRows := range numRowsPerInsert {
var insertedRows int64
err := results.QueryRow().Scan(&insertedRows)
if err != nil {
return err, lowestMinTime
}
numRowsExpected := int64(numRows)
if numRowsExpected != insertedRows {
for idx, numRows := range numRowsPerInsert {
if numRows != insertedRows[idx] {
affectedMetrics++
registerDuplicates(numRowsExpected - insertedRows)
registerDuplicates(int64(numRows - insertedRows[idx]))
}
}
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Add(float64(totalSamples))
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars))

var val []byte
row := results.QueryRow()
err = row.Scan(&val)
if err != nil {
return err, lowestMinTime
}
reportDuplicates(affectedMetrics)
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(start).Seconds())
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds())
return nil, lowestMinTime
}

func createTempIngestTable(ctx context.Context, tx pgx.Tx, table, schema, prefix string) (pgx.Identifier, error) {
var tempTableNameRawString string
row := tx.QueryRow(ctx, "SELECT _prom_catalog.create_ingest_temp_table($1, $2, $3)", table, schema, prefix)
if err := row.Scan(&tempTableNameRawString); err != nil {
return nil, err
}
return pgx.Identifier{tempTableNameRawString}, nil
}

func insertMetadata(conn pgxconn.PgxConn, reqs []pgmodel.Metadata) (insertedRows uint64, err error) {
numRows := len(reqs)
timeSlice := make([]time.Time, numRows)
Expand Down
Loading