Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add a database connection pool for maintenance jobs
Browse files Browse the repository at this point in the history
Creates a separate pool of database connections for the telemetry
and vacuum engines. We don't want maintenance jobs stealing from
the other pools, especially the writer pool.

The maintenance pool defaults to 5 connections.

The logic for calculating database pool sizes was revamped now that
there are 3 pools. Tests were added.
  • Loading branch information
jgpruitt committed Sep 23, 2022
1 parent 36ec07f commit f228c92
Show file tree
Hide file tree
Showing 13 changed files with 828 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use the following categories for changes:
- Enable tracing.async-acks by default [#1633].
- Sizes of maintenance worker backlogs exposed as database metrics on the Promscale dashboard [#1634]
- Added a vacuum engine that detects and vacuums/freezes compressed chunks [#1648]
- Add pool of database connections for maintenance jobs e.g. telemetry [#1657]

### Changed
- Log throughput in the same line for samples, spans and metric metadata [#1643]
Expand Down
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ The following subsections cover all CLI flags which promscale supports. You can
| db.connections.reader-pool.size | integer | 30% of possible connections db | Maximum size of the reader pool of database connections. This defaults to 30% of max_connections allowed by the database. |
| db.connections.writer-pool.size | integer | 50% of possible connections db | Maximum size of the writer pool of database connections. This defaults to 50% of max_connections allowed by the database. |
| db.connections.writer-pool.synchronous-commit | boolean | false | Enable/disable synchronous_commit on database connections in the writer pool. |
| db.connections.maint-pool.size | integer | 5 | Maximum size of the maintenance pool of database connections. This defaults to 5 |
| db.host | string | localhost | Host for TimescaleDB/Vanilla Postgres. |
| db.name | string | timescale | Database name. |
| db.password | string | | Password for connecting to TimescaleDB/Vanilla Postgres. |
Expand Down
69 changes: 45 additions & 24 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type LockFunc = func(ctx context.Context, conn *pgx.Conn) error
type Client struct {
readerPool pgxconn.PgxConn
writerPool pgxconn.PgxConn
maintPool pgxconn.PgxConn
ingestor ingestor.DBInserter
querier querier.Querier
promqlEngine *promql.Engine
Expand All @@ -63,21 +64,35 @@ type Client struct {
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
var (
err error
maxConns int
readerPoolSize int
writerPoolSize int
maintPoolSize int
numCopiers int
totalConns int
writerPool *pgxpool.Pool
readerFraction = defaultConnFraction
writerFraction = defaultConnFraction
maintPool *pgxpool.Pool
)
if !readOnly {
readerFraction = defaultReaderFraction // Since readerFraction + writerFraction should be 0.8 or 80% of allowed database connections.
writerPoolSize, err = cfg.GetPoolSize("writer", writerFraction, cfg.WriterPoolSize)
if err != nil {
return nil, fmt.Errorf("get writer pool size: %w", err)
}
totalConns += writerPoolSize

maxConns, err = cfg.maxConn()
if err != nil {
return nil, fmt.Errorf("max connections: %w", err)
}

readerPoolSize, writerPoolSize, maintPoolSize, err = cfg.GetPoolSizes(readOnly, maxConns)
if err != nil {
return nil, err
}

maintPgConfig, err := cfg.getPgConfig(maintPoolSize)
if err != nil {
return nil, fmt.Errorf("get maint pg-config: %w", err)
}
maintPool, err = pgxpool.ConnectConfig(context.Background(), maintPgConfig)
if err != nil {
return nil, fmt.Errorf("err creating maintenance connection pool: %w", err)
}

if !readOnly {
numCopiers, err = cfg.GetNumCopiers()
if err != nil {
return nil, fmt.Errorf("get num copiers: %w", err)
Expand All @@ -99,16 +114,6 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
}
}

readerPoolSize, err := cfg.GetPoolSize("reader", readerFraction, cfg.ReaderPoolSize)
if err != nil {
return nil, fmt.Errorf("get reader pool size: %w", err)
}
totalConns += readerPoolSize

if cfg.MaxConnections != defaultMaxConns && totalConns > cfg.MaxConnections {
return nil, fmt.Errorf("reader-pool (size=%d) + writer-pool (size=%d) more than db.connections-max (%d). Increase the db.connections-max or decrease the pool-sizes", readerPoolSize, writerPoolSize, cfg.MaxConnections)
}

readerPgConfig, err := cfg.getPgConfig(readerPoolSize)
if err != nil {
return nil, fmt.Errorf("get reader pg-config: %w", err)
Expand All @@ -122,6 +127,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
log.Info("msg", "runtime",
"writer-pool.size", writerPoolSize,
"reader-pool.size", readerPoolSize,
"maint-pool.size", maintPoolSize,
"min-pool.connections", MinPoolSize,
"num-copiers", numCopiers,
"statement-cache", statementCacheLog)
Expand All @@ -131,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
if err != nil {
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
}
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, mt, readOnly)
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly)
if err != nil {
return client, err
}
Expand Down Expand Up @@ -191,7 +197,7 @@ func getRedactedConnStr(s string) string {
}

// NewClientWithPool creates a new PostgreSQL client with an existing connection pool.
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
sigClose := make(chan struct{})
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
Expand All @@ -207,7 +213,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
TracesBatchWorkers: cfg.TracesBatchWorkers,
}

var writerConn pgxconn.PgxConn
var (
writerConn pgxconn.PgxConn
maintConn pgxconn.PgxConn
)

readerConn := pgxconn.NewQueryLoggingPgxConn(readerPool)

exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)
Expand All @@ -226,10 +236,14 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
return nil, err
}
}
if maintPool != nil {
maintConn = pgxconn.NewPgxConn(maintPool)
}

client := &Client{
readerPool: readerConn,
writerPool: writerConn,
maintPool: maintConn,
ingestor: dbIngestor,
querier: dbQuerier,
healthCheck: health.NewHealthChecker(readerConn),
Expand All @@ -240,14 +254,18 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
sigClose: sigClose,
}

initMetrics(r, writerPool, readerPool)
initMetrics(r, writerPool, readerPool, maintPool)
return client, nil
}

func (c *Client) ReadOnlyConnection() pgxconn.PgxConn {
return c.readerPool
}

func (c *Client) MaintenanceConnection() pgxconn.PgxConn {
return c.maintPool
}

func (c *Client) InitPromQLEngine(cfg *query.Config) error {
engine, err := query.NewEngine(log.GetLogger(), cfg.MaxQueryTimeout, cfg.LookBackDelta, cfg.SubQueryStepInterval, cfg.MaxSamples, cfg.EnabledFeatureMap)
if err != nil {
Expand All @@ -269,6 +287,9 @@ func (c *Client) Close() {
}
close(c.sigClose)
if c.closePool {
if c.maintPool != nil {
c.maintPool.Close()
}
if c.writerPool != nil {
c.writerPool.Close()
}
Expand Down
95 changes: 73 additions & 22 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/limits"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/version"
Expand All @@ -38,6 +39,7 @@ type Config struct {
WriterPoolSize int
WriterSynchronousCommit bool
ReaderPoolSize int
MaintPoolSize int
MaxConnections int
UsesHA bool
DbUri string
Expand All @@ -59,6 +61,7 @@ const (
defaultDbStatementsCache = true
MinPoolSize = 2
defaultPoolSize = -1
defaultMaintPoolSize = 5
defaultMaxConns = -1
defaultWriterSynchronousCommit = false
)
Expand Down Expand Up @@ -89,8 +92,9 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+
"allowed by the database.")
fs.BoolVar(&cfg.WriterSynchronousCommit, "db.connections.writer-pool.synchronous-commit", defaultWriterSynchronousCommit, "Enable/disable synchronous_commit on database connections in the writer pool.")
fs.IntVar(&cfg.ReaderPoolSize, "db.connections.reader-pool.size", defaultPoolSize, "Maximum size of the reader pool of database connections. This defaults to 30% of max_connections "+
"allowed by the database.")
fs.IntVar(&cfg.ReaderPoolSize, "db.connections.reader-pool.size", defaultPoolSize, "Maximum size of the reader pool of database connections. This defaults to roughly 30% of max_connections "+
"allowed by the database. 50% in read-only mode")
fs.IntVar(&cfg.MaintPoolSize, "db.connections.maint-pool.size", defaultMaintPoolSize, "Maximum size of the maintenance pool of database connections. This defaults to 5")
fs.IntVar(&cfg.MaxConnections, "db.connections-max", defaultMaxConns, "Maximum number of connections to the database that should be opened at once. "+
"It defaults to 80% of the maximum connections that the database can handle. ")
fs.StringVar(&cfg.DbUri, "db.uri", defaultDBUri, "TimescaleDB/Vanilla Postgres DB URI. "+
Expand Down Expand Up @@ -154,32 +158,79 @@ func (cfg *Config) GetConnectionStr() string {
return cfg.DbUri
}

// GetPoolSize returns the max pool size based on the max_connections allowed by database and the defaultFraction.
// Arg inputPoolSize is the pool size provided in the CLI flag.
func (cfg *Config) GetPoolSize(poolName string, defaultFraction float64, inputPoolSize int) (int, error) {
maxConns, err := cfg.maxConn()
if err != nil {
return 0, fmt.Errorf("max connections: %w", err)
// GetPoolSizes returns the pool sizes adjusted according to defaults, read only settings, and max connections available
func (cfg *Config) GetPoolSizes(readOnly bool, dbMaxConns int) (readerPoolSize, writerPoolSize, maintPoolSize int, err error) {
cfgMaxConns := cfg.MaxConnections
if cfgMaxConns == defaultMaxConns {
cfgMaxConns = int(float64(dbMaxConns) * 0.8)
}
if cfgMaxConns > dbMaxConns {
log.Warn("msg", "configured db.connections-max is greater then postgres max_connections",
"db.connections-max", cfgMaxConns,
"max_connections", dbMaxConns,
)
}

switch {
case inputPoolSize == defaultPoolSize:
// For the default case, we need to take up defaultFraction of allowed connections.
poolSize := float64(maxConns) * defaultFraction
calc := func(inputPoolSize int, fraction float64) int {
if inputPoolSize == defaultPoolSize {
poolSize := float64(dbMaxConns) * fraction
if cfg.UsesHA {
poolSize /= 2
}
return int(poolSize)
}
if cfg.UsesHA {
poolSize /= 2
inputPoolSize /= 2
}
return int(poolSize), nil
case inputPoolSize < MinPoolSize:
return 0, fmt.Errorf("%s pool size canot be less than %d: received %d", poolName, MinPoolSize, inputPoolSize)
case inputPoolSize > maxConns:
return 0, fmt.Errorf("%s pool size canot be greater than the 'max_connections' allowed by the database", poolName)
default:
return inputPoolSize
}

// calc maint pool size
maintPoolSize = cfg.MaintPoolSize
if maintPoolSize == defaultPoolSize {
maintPoolSize = defaultMaintPoolSize
}

// calc writer pool size
if !readOnly {
writerPoolSize = calc(cfg.WriterPoolSize, defaultConnFraction)
}

// calc reader pool size
readerFraction := defaultReaderFraction
if readOnly {
readerFraction = defaultConnFraction
}
readerPoolSize = calc(cfg.ReaderPoolSize, readerFraction)
if cfg.ReaderPoolSize == defaultPoolSize && readerPoolSize+writerPoolSize+maintPoolSize > cfgMaxConns {
readerPoolSize = readerPoolSize - maintPoolSize
}

// check pool sizes against min pool size
if maintPoolSize < MinPoolSize {
return 0, 0, 0,
fmt.Errorf("maint pool size canot be less than %d: received %d", MinPoolSize, maintPoolSize)
}
if !readOnly && writerPoolSize < MinPoolSize {
return 0, 0, 0,
fmt.Errorf("writer pool size canot be less than %d: received %d", MinPoolSize, writerPoolSize)
}
if readerPoolSize < MinPoolSize {
return 0, 0, 0,
fmt.Errorf("reader pool size canot be less than %d: received %d", MinPoolSize, readerPoolSize)
}

// check sum of pool sizes against database max_connections
if readerPoolSize+writerPoolSize+maintPoolSize > dbMaxConns {
return 0, 0, 0,
fmt.Errorf("reader-pool (size=%d) + writer-pool (size=%d) + maint-pool (size=%d) more than postgres max_connections (%d). Decrease the pool-sizes", readerPoolSize, writerPoolSize, maintPoolSize, dbMaxConns)
}
if cfg.UsesHA {
inputPoolSize /= 2
// check sum of pool sizes against config db.connections-max
if readerPoolSize+writerPoolSize+maintPoolSize > cfgMaxConns {
return 0, 0, 0,
fmt.Errorf("reader-pool (size=%d) + writer-pool (size=%d) + maint-pool (size=%d) more than db.connections-max (%d). Increase the db.connections-max or decrease the pool-sizes", readerPoolSize, writerPoolSize, maintPoolSize, cfgMaxConns)
}
return inputPoolSize, nil
return
}

func (cfg *Config) maxConn() (int, error) {
Expand Down
Loading

0 comments on commit f228c92

Please sign in to comment.