Skip to content

Commit 6c1aa4f

Browse files
committed
[*] optimize maintenance tasks in Postgres sink, closes #786
From now `admin.config` holds information about last successful operation. Maintenance routines will check if any other instance already processed it before starting it's own.
1 parent 19b1703 commit 6c1aa4f

File tree

2 files changed

+62
-33
lines changed

2 files changed

+62
-33
lines changed

internal/sinks/postgres.go

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ type MeasurementMessagePostgres struct {
127127
TagData map[string]string
128128
}
129129

130+
const sourcesMaintenanceInterval = time.Hour * 24
131+
130132
type DbStorageSchemaType int
131133

132134
const (
@@ -496,6 +498,10 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
496498
}
497499

498500
for {
501+
if !pgw.StartMaintenanceActivity(pgw.ctx, "partitions_maintenance") {
502+
continue
503+
}
504+
499505
if pgw.metricSchema == DbStorageSchemaTimescale {
500506
partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
501507
if err != nil {
@@ -534,40 +540,76 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
534540
}
535541
}
536542

543+
func (pgw *PostgresWriter) StartMaintenanceActivity(ctx context.Context, activity string) bool {
544+
logger := log.GetLogger(ctx)
545+
tx, err := pgw.sinkDb.Begin(pgw.ctx)
546+
if err != nil {
547+
logger.Errorf("Starting transaction for %s maintenance failed: %w", activity, err)
548+
return false
549+
}
550+
func() {
551+
if err != nil {
552+
_ = tx.Rollback(pgw.ctx)
553+
} else {
554+
_ = tx.Commit(pgw.ctx)
555+
}
556+
_, _ = pgw.sinkDb.Exec(pgw.ctx, `SELECT pg_advisory_unlock(1571543679778230000)`)
557+
}()
558+
559+
var check bool
560+
if err = tx.QueryRow(pgw.ctx, `SELECT now() - value::timestamptz > $1::interval FROM admin.config WHERE key = $2`,
561+
sourcesMaintenanceInterval, activity).Scan(&check); err != nil {
562+
logger.Errorf("Checking partition maintenance interval failed: %w", err)
563+
return false
564+
}
565+
if !check {
566+
logger.Infof("Skipping %s maintenance as it was handled in the last 24 hours...", activity)
567+
return false
568+
}
569+
570+
var lock bool
571+
logger.Infof("Trying to get %s maintenance advisory lock...", activity)
572+
if err = tx.QueryRow(pgw.ctx, `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`).Scan(&lock); err != nil {
573+
logger.Error(err)
574+
return false
575+
}
576+
if !lock {
577+
logger.Infof("Skipping %s maintenance as another instance has the advisory lock...", activity)
578+
return false
579+
}
580+
581+
if _, err = tx.Exec(pgw.ctx, `UPDATE admin.config SET value = now()::text WHERE key = $1`, activity); err != nil {
582+
logger.Error(err)
583+
return false
584+
}
585+
return true
586+
}
587+
537588
// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
538589
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
539590
func (pgw *PostgresWriter) maintainUniqueSources() {
540591
logger := log.GetLogger(pgw.ctx)
541592
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
542-
sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
543593
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
544-
sqlDistinct := `
545-
WITH RECURSIVE t(dbname) AS (
594+
sqlDistinct := `WITH RECURSIVE t(dbname) AS (
546595
SELECT MIN(dbname) AS dbname FROM %s
547596
UNION
548597
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
549598
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
550599
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
551600
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
552-
sqlAdd := `
553-
INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
601+
sqlAdd := `INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
554602
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
555603
RETURNING *`
556604

557605
for {
558606
select {
559607
case <-pgw.ctx.Done():
560608
return
561-
case <-time.After(time.Hour * 24):
609+
case <-time.After(sourcesMaintenanceInterval):
562610
}
563-
var lock bool
564-
logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
565-
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
566-
logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
567-
continue
568-
}
569-
if !lock {
570-
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
611+
612+
if !pgw.StartMaintenanceActivity(pgw.ctx, "sources_maintenance") {
571613
continue
572614
}
573615

@@ -587,7 +629,6 @@ func (pgw *PostgresWriter) maintainUniqueSources() {
587629
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
588630
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
589631
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
590-
// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
591632
if err != nil {
592633
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
593634
break

internal/sinks/sql/admin_schema.sql

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,6 @@ create schema "subpartitions";
99

1010
create extension if not exists btree_gin;
1111

12-
-- grant all on schema public to pgwatch;
13-
14-
-- do $sql$
15-
-- begin
16-
-- execute format($$alter role pgwatch in database %s set statement_timeout to '5min'$$, current_database());
17-
-- raise warning 'Enabling asynchronous commit for pgwatch role - revert if possible data loss on crash is not acceptable!';
18-
-- execute format($$alter role pgwatch in database %s set synchronous_commit to off$$, current_database());
19-
-- end
20-
-- $sql$;
21-
22-
-- set role to pgwatch;
23-
24-
2512
create function admin.get_default_storage_type() returns text as
2613
$$
2714
select case
@@ -64,8 +51,12 @@ create table admin.config
6451

6552
-- to later change the value call the admin.change_timescale_chunk_interval(interval) function!
6653
-- as changing the row directly will only be effective for completely new tables (metrics).
67-
insert into admin.config select 'timescale_chunk_interval', '2 days';
68-
insert into admin.config select 'timescale_compress_interval', '1 day';
54+
insert into admin.config values
55+
('timescale_chunk_interval', '2 days'),
56+
('timescale_compress_interval', '1 day'),
57+
('partitions_maintenance', now()::text),
58+
('sources_maintenance', now()::text);
59+
6960

7061
create or replace function trg_config_modified() returns trigger
7162
as $$
@@ -153,6 +144,3 @@ COMMENT ON TABLE admin.metrics_template_realtime IS 'used as a template for all
153144

154145
-- create index on admin.metrics_template using brin (dbname, time) with (pages_per_range=32); /* consider BRIN instead for large data amounts */
155146
CREATE INDEX ON admin.metrics_template_realtime (dbname, time);
156-
157-
-- RESET ROLE;
158-

0 commit comments

Comments
 (0)