Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Reuse ingestion temp tables across batches
Browse files Browse the repository at this point in the history
Currently, the temp tables used for ingestion are named using a
transient value. This PR substitutes the transient value for the
metric_id. This change creates a temp table that is specific to
the metric and database session which allows it to be reused.

The temp table is currently created with ON COMMIT DROP, which
requires that the temp table be recreated with each transaction.
This incurs significant pg_catalog churn/bloat. A corresponding
PR will be made in the extension to change this to
ON COMMIT DELETE ROWS.
  • Loading branch information
jgpruitt committed Sep 30, 2022
1 parent 65eedbd commit 7e76605
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ We use the following categories for changes:
- TimescaleDB is now mandatory [#1660].
- When querying for Jaeger tags with binary values the binary data will be
returned instead of the base64 representation of the string [#1649].
- Reuse ingestion temp tables across batches [#1679]

### Fixed
- Do not collect telemetry if `timescaledb.telemetry_level=off` [#1612]
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,11 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re

copyFromFunc := func(tableName, schemaName string, isExemplar bool) error {
columns := schema.PromDataColumns
tempTablePrefix := fmt.Sprintf("s%d_", r)
tempTablePrefix := fmt.Sprintf("s%d_", req.info.MetricID)
rows := sampleRows
if isExemplar {
columns = schema.PromExemplarColumns
tempTablePrefix = fmt.Sprintf("e%d_", r)
tempTablePrefix = fmt.Sprintf("e%d_", req.info.MetricID)
rows = exemplarRows
}
table := pgx.Identifier{schemaName, tableName}
Expand Down
22 changes: 11 additions & 11 deletions pkg/pgmodel/ingestor/ingestor_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,13 @@ func TestPGXInserterInsertData(t *testing.T) {
{
// on epoch error we go for on conflict insert approach
Sql: "SELECT _prom_catalog.create_ingest_temp_table($1, $2, $3)",
Args: []interface{}{"metric_0", "prom_data", "s0_"},
Results: model.RowResults{{"s0_metric_0"}},
Args: []interface{}{"metric_0", "prom_data", "s1_"},
Results: model.RowResults{{"s1_metric_0"}},
Err: error(nil),
},
{
Copy: &model.Copy{
Table: pgx.Identifier{"s0_metric_0"},
Table: pgx.Identifier{"s1_metric_0"},
Data: [][]interface{}{
{time.Unix(0, 0), float64(0), int64(1)},
},
Expand All @@ -704,7 +704,7 @@ func TestPGXInserterInsertData(t *testing.T) {
},
// insert from temp table using on conflict
{
Sql: "INSERT INTO prom_data.\"metric_0\"(time,value,series_id) SELECT time,value,series_id FROM \"s0_metric_0\" ON CONFLICT DO NOTHING",
Sql: "INSERT INTO prom_data.\"metric_0\"(time,value,series_id) SELECT time,value,series_id FROM \"s1_metric_0\" ON CONFLICT DO NOTHING",
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
Expand Down Expand Up @@ -757,14 +757,14 @@ func TestPGXInserterInsertData(t *testing.T) {
{
// on epoch error we go for on conflict insert approach
Sql: "SELECT _prom_catalog.create_ingest_temp_table($1, $2, $3)",
Args: []interface{}{"metric_0", "prom_data", "s0_"},
Results: model.RowResults{{"s0_metric_0"}},
Args: []interface{}{"metric_0", "prom_data", "s1_"},
Results: model.RowResults{{"s1_metric_0"}},
Err: error(nil),
},
// retry of insert of individual copy request
{
Copy: &model.Copy{
Table: pgx.Identifier{"s0_metric_0"},
Table: pgx.Identifier{"s1_metric_0"},
Data: [][]interface{}{
{time.Unix(0, 0), float64(0), int64(1)},
{time.Unix(0, 0), float64(0), int64(1)},
Expand Down Expand Up @@ -876,14 +876,14 @@ func TestPGXInserterInsertData(t *testing.T) {
// retry by creating temp table
{
Sql: "SELECT _prom_catalog.create_ingest_temp_table($1, $2, $3)",
Args: []interface{}{"metric_0", "prom_data", "s0_"},
Results: model.RowResults{{"s0_metric_0"}},
Args: []interface{}{"metric_0", "prom_data", "s1_"},
Results: model.RowResults{{"s1_metric_0"}},
Err: error(nil),
},
// copy into created temp table
{
Copy: &model.Copy{
Table: pgx.Identifier{"s0_metric_0"},
Table: pgx.Identifier{"s1_metric_0"},
Data: [][]interface{}{
{time.Unix(0, 0), float64(0), int64(1)},
{time.Unix(0, 0), float64(0), int64(1)},
Expand All @@ -894,7 +894,7 @@ func TestPGXInserterInsertData(t *testing.T) {
},
// insert from temp table using on conflict
{
Sql: "INSERT INTO prom_data.\"metric_0\"(time,value,series_id) SELECT time,value,series_id FROM \"s0_metric_0\" ON CONFLICT DO NOTHING",
Sql: "INSERT INTO prom_data.\"metric_0\"(time,value,series_id) SELECT time,value,series_id FROM \"s1_metric_0\" ON CONFLICT DO NOTHING",
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
Expand Down

0 comments on commit 7e76605

Please sign in to comment.