Skip to content

Commit 8ca1365

Browse files
authored
cmd,collector: optionally disable concurrent scrapes (#6)
Previously approved but merge into the wrong base: #2 ## Background The current behavior of the exporter is to open a new database connection on every scrape. First, here is where the default metrics collector and new-style collectors are registered: https://github.com/planetscale/postgres_exporter/blob/198454cc9e56141d5cc422149755fe8e80b3eeea/cmd/postgres_exporter/main.go#L126 https://github.com/planetscale/postgres_exporter/blob/198454cc9e56141d5cc422149755fe8e80b3eeea/cmd/postgres_exporter/main.go#L143 Prometheus may run these collectors in parallel. Additionally, the `collectors` package is set up to support concurrent scrapes: https://github.com/planetscale/postgres_exporter/blob/198454cc9e56141d5cc422149755fe8e80b3eeea/collector/collector.go#L171-L176 No doubt it's useful for some to have the default and new-style metrics be collected concurrently, and to be able to support concurrent srapes. ## Changes But for PlanetScale, our metric collection system does not scrape the same endpoint concurrently, so concurrent scrapes aren't useful for us. We can also live without whatever time is gained by having default and new-style metrics be collected concurrently: our scrape timeout is 10s, and we expect metrics to be collected much faster than that. If not, then we probably have other problems we need to look at. Additionally we want to consume as few customer connections as possible. So having the option of using a single, shared connection between default and new-style metrics is good for us. Additionally, if customers have used up all their connections, having each scrape create a new db conn might mean we can't produce metrics. So, having the option to use a single, shared, _persistent_ connection is doubly useful. ## Validation Create a role with a connection limit of 1: ```postgresql CREATE ROLE postgres_exporter WITH LOGIN CONNECTION LIMIT 1; GRANT pg_monitor TO postgres_exporter; GRANT CONNECT ON DATABASE postgres TO postgres_exporter; ``` ### With `--no-concurrent-scrape` Start the exporter with concurrent scraping disabled: ```bash DATA_SOURCE_NAME="postgresql://postgres_exporter@127.0.0.1:5432/postgres?sslmode=disable" ./postgres_exporter --no-concurrent-scrape --log.level=info ``` Scraping metrics shows no errors in output. ### With `--concurrent-scrape` Repeat with `--concurrent-scrape`, shows this error: > time=2025-09-06T20:56:44.568-04:00 level=ERROR source=collector.go:195 msg="Error opening connection to database" err="error querying postgresql version: pq: too many connections for role \"postgres_exporter\"" ### Connection resilience Verified that `--no-concurrent-scrape` is resilient to Postgres connection resets. After a connection reset, the exporter will log an error: > time=2025-09-06T22:44:06.172-04:00 level=ERROR source=collector.go:180 msg="Error creating instance" err="driver: bad connection" But the connection will be recreated and the scrape (or the next scrape anyway) will succeed. Behavior seems similar to `master`. --------- Signed-off-by: Max Englander <max@planetscale.com>
1 parent 2367a8d commit 8ca1365

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+180
-118
lines changed

cmd/postgres_exporter/main.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ package main
1515

1616
import (
1717
"fmt"
18+
"log/slog"
1819
"net/http"
1920
"os"
2021
"strings"
22+
"time"
2123

2224
"github.com/alecthomas/kingpin/v2"
2325
"github.com/prometheus-community/postgres_exporter/collector"
@@ -32,6 +34,59 @@ import (
3234
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
3335
)
3436

37+
func registerPostgresCollector(dsn string, exporter *Exporter, logger *slog.Logger, excludedDatabases []string, scrapeTimeout time.Duration, concurrentScrape bool) {
38+
if dsn == "" {
39+
return
40+
}
41+
42+
var factory collector.InstanceFactory
43+
44+
if concurrentScrape {
45+
// Original behavior: dedicated instance for collector, creates new connection per scrape
46+
template, err := collector.NewInstance(dsn)
47+
if err != nil {
48+
logger.Warn("Failed to create template instance", "err", err.Error())
49+
return
50+
}
51+
factory = collector.InstanceFactoryFromTemplate(template)
52+
} else {
53+
// New optimized behavior: share connection from server with resilience
54+
factory = func() (*collector.Instance, error) {
55+
server, err := exporter.servers.GetServer(dsn)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
inst, err := collector.NewInstance(dsn)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
err = inst.SetupWithConnection(server.db)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
return inst, nil
71+
}
72+
}
73+
74+
// Create collector with factory
75+
pe, err := collector.NewPostgresCollector(
76+
logger,
77+
excludedDatabases,
78+
factory,
79+
[]string{},
80+
collector.WithTimeout(scrapeTimeout),
81+
)
82+
if err != nil {
83+
logger.Warn("Failed to create PostgresCollector", "err", err.Error())
84+
return
85+
}
86+
87+
prometheus.MustRegister(pe)
88+
}
89+
3590
var (
3691
c = config.Handler{
3792
Config: &config.Config{},
@@ -50,6 +105,7 @@ var (
50105
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
51106
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
52107
scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum time for a scrape to complete before timing out (0 = no timeout)").Default("0").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration()
108+
concurrentScrape = kingpin.Flag("concurrent-scrape", "Use dedicated instance for collector allowing concurrent scrapes (default: true for backward compatibility)").Default("true").Envar("PG_EXPORTER_CONCURRENT_SCRAPE").Bool()
53109
logger = promslog.NewNopLogger()
54110
)
55111

@@ -133,18 +189,7 @@ func main() {
133189
dsn = dsns[0]
134190
}
135191

136-
pe, err := collector.NewPostgresCollector(
137-
logger,
138-
excludedDatabases,
139-
dsn,
140-
[]string{},
141-
collector.WithTimeout(*scrapeTimeout),
142-
)
143-
if err != nil {
144-
logger.Warn("Failed to create PostgresCollector", "err", err.Error())
145-
} else {
146-
prometheus.MustRegister(pe)
147-
}
192+
registerPostgresCollector(dsn, exporter, logger, excludedDatabases, *scrapeTimeout, *concurrentScrape)
148193

149194
http.Handle(*metricsPath, promhttp.Handler())
150195

collector/collector.go

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var (
5959
)
6060

6161
type Collector interface {
62-
Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error
62+
Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error
6363
}
6464

6565
type collectorConfig struct {
@@ -89,11 +89,10 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle
8989

9090
// PostgresCollector implements the prometheus.Collector interface.
9191
type PostgresCollector struct {
92-
Collectors map[string]Collector
93-
logger *slog.Logger
94-
scrapeTimeout time.Duration
95-
96-
instance *instance
92+
Collectors map[string]Collector
93+
logger *slog.Logger
94+
scrapeTimeout time.Duration
95+
instanceFactory InstanceFactory
9796
}
9897

9998
type Option func(*PostgresCollector) error
@@ -107,9 +106,10 @@ func WithTimeout(timeout time.Duration) Option {
107106
}
108107

109108
// NewPostgresCollector creates a new PostgresCollector.
110-
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
109+
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, factory InstanceFactory, filters []string, options ...Option) (*PostgresCollector, error) {
111110
p := &PostgresCollector{
112-
logger: logger,
111+
logger: logger,
112+
instanceFactory: factory,
113113
}
114114
// Apply options to customize the collector
115115
for _, o := range options {
@@ -154,16 +154,6 @@ func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn st
154154

155155
p.Collectors = collectors
156156

157-
if dsn == "" {
158-
return nil, errors.New("empty dsn")
159-
}
160-
161-
instance, err := newInstance(dsn)
162-
if err != nil {
163-
return nil, err
164-
}
165-
p.instance = instance
166-
167157
return p, nil
168158
}
169159

@@ -184,16 +174,13 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
184174
ctx = context.Background()
185175
}
186176

187-
// copy the instance so that concurrent scrapes have independent instances
188-
inst := p.instance.copy()
189-
190-
// Set up the database connection for the collector.
191-
err := inst.setup()
192-
defer inst.Close()
177+
// Use the factory to get an instance
178+
inst, err := p.instanceFactory()
193179
if err != nil {
194-
p.logger.Error("Error opening connection to database", "err", err)
180+
p.logger.Error("Error creating instance", "err", err)
195181
return
196182
}
183+
defer inst.Close() // Always safe - closeDB flag determines if connection is actually closed
197184

198185
wg := sync.WaitGroup{}
199186
wg.Add(len(p.Collectors))
@@ -206,11 +193,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
206193
wg.Wait()
207194
}
208195

209-
func (p *PostgresCollector) Close() error {
210-
return p.instance.Close()
211-
}
212-
213-
func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) {
196+
func execute(ctx context.Context, name string, c Collector, instance *Instance, ch chan<- prometheus.Metric, logger *slog.Logger) {
214197
begin := time.Now()
215198
err := c.Update(ctx, instance, ch)
216199
duration := time.Since(begin)

collector/instance.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import (
2121
"github.com/blang/semver/v4"
2222
)
2323

24-
type instance struct {
24+
type Instance struct {
2525
dsn string
2626
db *sql.DB
2727
version semver.Version
28+
closeDB bool // whether we should close the connection on Close()
2829
}
2930

30-
func newInstance(dsn string) (*instance, error) {
31-
i := &instance{
31+
func NewInstance(dsn string) (*Instance, error) {
32+
i := &Instance{
3233
dsn: dsn,
3334
}
3435

@@ -44,20 +45,21 @@ func newInstance(dsn string) (*instance, error) {
4445
}
4546

4647
// copy returns a copy of the instance.
47-
func (i *instance) copy() *instance {
48-
return &instance{
48+
func (i *Instance) copy() *Instance {
49+
return &Instance{
4950
dsn: i.dsn,
5051
}
5152
}
5253

53-
func (i *instance) setup() error {
54+
func (i *Instance) setup() error {
5455
db, err := sql.Open("postgres", i.dsn)
5556
if err != nil {
5657
return err
5758
}
5859
db.SetMaxOpenConns(1)
5960
db.SetMaxIdleConns(1)
6061
i.db = db
62+
i.closeDB = true // we created this connection, so we should close it
6163

6264
version, err := queryVersion(i.db)
6365
if err != nil {
@@ -68,12 +70,28 @@ func (i *instance) setup() error {
6870
return nil
6971
}
7072

71-
func (i *instance) getDB() *sql.DB {
73+
// SetupWithConnection sets up the instance with an existing database connection.
74+
func (i *Instance) SetupWithConnection(db *sql.DB) error {
75+
i.db = db
76+
i.closeDB = false // we're borrowing this connection, don't close it
77+
78+
version, err := queryVersion(i.db)
79+
if err != nil {
80+
return fmt.Errorf("error querying postgresql version: %w", err)
81+
}
82+
i.version = version
83+
return nil
84+
}
85+
86+
func (i *Instance) getDB() *sql.DB {
7287
return i.db
7388
}
7489

75-
func (i *instance) Close() error {
76-
return i.db.Close()
90+
func (i *Instance) Close() error {
91+
if i.closeDB {
92+
return i.db.Close()
93+
}
94+
return nil
7795
}
7896

7997
// Regex used to get the "short-version" from the postgres version field.
@@ -104,3 +122,19 @@ func queryVersion(db *sql.DB) (semver.Version, error) {
104122
}
105123
return semver.Version{}, fmt.Errorf("could not parse version from %q", version)
106124
}
125+
126+
// InstanceFactory creates instances for collectors to use
127+
type InstanceFactory func() (*Instance, error)
128+
129+
// InstanceFactoryFromTemplate creates a factory that copies from a template instance
130+
// and creates a new database connection for each call
131+
func InstanceFactoryFromTemplate(template *Instance) InstanceFactory {
132+
return func() (*Instance, error) {
133+
inst := template.copy()
134+
err := inst.setup() // Creates new connection, sets closeDB=true
135+
if err != nil {
136+
return nil, err
137+
}
138+
return inst, nil
139+
}
140+
}

collector/pg_buffercache_summary.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ var (
9191

9292
// Update implements Collector
9393
// It is called by the Prometheus registry when collecting metrics.
94-
func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
94+
func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
9595
// pg_buffercache_summary is only in v16, and we don't need support for earlier currently.
9696
if !instance.version.GE(semver.MustParse("16.0.0")) {
9797
return nil

collector/pg_buffercache_summary_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestBuffercacheSummaryCollector(t *testing.T) {
3030
}
3131
defer db.Close()
3232

33-
inst := &instance{db: db, version: semver.MustParse("16.0.0")}
33+
inst := &Instance{db: db, version: semver.MustParse("16.0.0")}
3434

3535
columns := []string{
3636
"buffers_used",

collector/pg_database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ var (
7676
// each database individually. This is because we can't filter the
7777
// list of databases in the query because the list of excluded
7878
// databases is dynamic.
79-
func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
79+
func (c PGDatabaseCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
8080
db := instance.getDB()
8181
// Query the list of databases
8282
rows, err := db.QueryContext(ctx,

collector/pg_database_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestPGDatabaseCollector(t *testing.T) {
2929
}
3030
defer db.Close()
3131

32-
inst := &instance{db: db}
32+
inst := &Instance{db: db}
3333

3434
mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname", "datconnlimit"}).
3535
AddRow("postgres", 15))
@@ -70,7 +70,7 @@ func TestPGDatabaseCollectorNullMetric(t *testing.T) {
7070
}
7171
defer db.Close()
7272

73-
inst := &instance{db: db}
73+
inst := &Instance{db: db}
7474

7575
mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname", "datconnlimit"}).
7676
AddRow("postgres", nil))

collector/pg_database_wraparound.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ var (
6161
`
6262
)
6363

64-
func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
64+
func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
6565
db := instance.getDB()
6666
rows, err := db.QueryContext(ctx,
6767
databaseWraparoundQuery)

collector/pg_database_wraparound_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestPGDatabaseWraparoundCollector(t *testing.T) {
2828
t.Fatalf("Error opening a stub db connection: %s", err)
2929
}
3030
defer db.Close()
31-
inst := &instance{db: db}
31+
inst := &Instance{db: db}
3232
columns := []string{
3333
"datname",
3434
"age_datfrozenxid",

collector/pg_locks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ var (
8888

8989
// Update implements Collector and exposes database locks.
9090
// It is called by the Prometheus registry when collecting metrics.
91-
func (c PGLocksCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
91+
func (c PGLocksCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
9292
db := instance.getDB()
9393
// Query the list of databases
9494
rows, err := db.QueryContext(ctx,

0 commit comments

Comments
 (0)