From ce4eb561fe394f7a1b763ccbe993ed20f6d548cf Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Thu, 24 Jul 2025 17:49:39 +0200 Subject: [PATCH 1/2] [*] optimize maintenance tasks in Postgres sink, closes #786 From now `admin.config` holds information about last successful operation. Maintenance routines will check if any other instance already processed it before starting it's own. --- internal/sinks/postgres.go | 71 +++++++++++++++++++++++------ internal/sinks/sql/admin_schema.sql | 24 +++------- 2 files changed, 62 insertions(+), 33 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index 5cc73f9e54..fbde8999f8 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -127,6 +127,8 @@ type MeasurementMessagePostgres struct { TagData map[string]string } +const sourcesMaintenanceInterval = time.Hour * 24 + type DbStorageSchemaType int const ( @@ -496,6 +498,10 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) { } for { + if !pgw.StartMaintenanceActivity(pgw.ctx, "partitions_maintenance") { + continue + } + if pgw.metricSchema == DbStorageSchemaTimescale { partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold) if err != nil { @@ -534,23 +540,65 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) { } } +func (pgw *PostgresWriter) StartMaintenanceActivity(ctx context.Context, activity string) bool { + logger := log.GetLogger(ctx) + tx, err := pgw.sinkDb.Begin(pgw.ctx) + if err != nil { + logger.Errorf("Starting transaction for %s maintenance failed: %w", activity, err) + return false + } + func() { + if err != nil { + _ = tx.Rollback(pgw.ctx) + } else { + _ = tx.Commit(pgw.ctx) + } + _, _ = pgw.sinkDb.Exec(pgw.ctx, `SELECT pg_advisory_unlock(1571543679778230000)`) + }() + + var check bool + if err = tx.QueryRow(pgw.ctx, `SELECT now() - value::timestamptz > $1::interval FROM admin.config WHERE key = $2`, + sourcesMaintenanceInterval, activity).Scan(&check); err != nil { + logger.Errorf("Checking partition maintenance interval failed: %w", err) + return false + } + if !check { + logger.Infof("Skipping %s maintenance as it was handled in the last 24 hours...", activity) + return false + } + + var lock bool + logger.Infof("Trying to get %s maintenance advisory lock...", activity) + if err = tx.QueryRow(pgw.ctx, `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`).Scan(&lock); err != nil { + logger.Error(err) + return false + } + if !lock { + logger.Infof("Skipping %s maintenance as another instance has the advisory lock...", activity) + return false + } + + if _, err = tx.Exec(pgw.ctx, `UPDATE admin.config SET value = now()::text WHERE key = $1`, activity); err != nil { + logger.Error(err) + return false + } + return true +} + // maintainUniqueSources is a background task that maintains a listing of unique sources for each metric. // This is used to avoid listing the same source multiple times in Grafana dropdowns. func (pgw *PostgresWriter) maintainUniqueSources() { logger := log.GetLogger(pgw.ctx) // due to metrics deletion the listing can go out of sync (a trigger not really wanted) - sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()` - sqlDistinct := ` - WITH RECURSIVE t(dbname) AS ( + sqlDistinct := `WITH RECURSIVE t(dbname) AS ( SELECT MIN(dbname) AS dbname FROM %s UNION SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t ) SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1` sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2` sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1` - sqlAdd := ` - INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x + sqlAdd := `INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2) RETURNING *` @@ -558,16 +606,10 @@ func (pgw *PostgresWriter) maintainUniqueSources() { select { case <-pgw.ctx.Done(): return - case <-time.After(time.Hour * 24): + case <-time.After(sourcesMaintenanceInterval): } - var lock bool - logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly - if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil { - logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err) - continue - } - if !lock { - logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...") + + if !pgw.StartMaintenanceActivity(pgw.ctx, "sources_maintenance") { continue } @@ -587,7 +629,6 @@ func (pgw *PostgresWriter) maintainUniqueSources() { logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName) rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName)) ret, err := pgx.CollectRows(rows, pgx.RowTo[string]) - // ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName)) if err != nil { logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err) break diff --git a/internal/sinks/sql/admin_schema.sql b/internal/sinks/sql/admin_schema.sql index ca94af513d..699071ca24 100644 --- a/internal/sinks/sql/admin_schema.sql +++ b/internal/sinks/sql/admin_schema.sql @@ -9,19 +9,6 @@ create schema "subpartitions"; create extension if not exists btree_gin; --- grant all on schema public to pgwatch; - --- do $sql$ --- begin --- execute format($$alter role pgwatch in database %s set statement_timeout to '5min'$$, current_database()); --- raise warning 'Enabling asynchronous commit for pgwatch role - revert if possible data loss on crash is not acceptable!'; --- execute format($$alter role pgwatch in database %s set synchronous_commit to off$$, current_database()); --- end --- $sql$; - --- set role to pgwatch; - - create function admin.get_default_storage_type() returns text as $$ select case @@ -64,8 +51,12 @@ create table admin.config -- to later change the value call the admin.change_timescale_chunk_interval(interval) function! -- as changing the row directly will only be effective for completely new tables (metrics). -insert into admin.config select 'timescale_chunk_interval', '2 days'; -insert into admin.config select 'timescale_compress_interval', '1 day'; +insert into admin.config values + ('timescale_chunk_interval', '2 days'), + ('timescale_compress_interval', '1 day'), + ('partitions_maintenance', now()::text), + ('sources_maintenance', now()::text); + create or replace function trg_config_modified() returns trigger as $$ @@ -153,6 +144,3 @@ COMMENT ON TABLE admin.metrics_template_realtime IS 'used as a template for all -- create index on admin.metrics_template using brin (dbname, time) with (pages_per_range=32); /* consider BRIN instead for large data amounts */ CREATE INDEX ON admin.metrics_template_realtime (dbname, time); - --- RESET ROLE; - From 4e8ca890168264021032eae3ef88dfa2cf52dc32 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Thu, 31 Jul 2025 12:21:37 +0200 Subject: [PATCH 2/2] defer transaction handling --- internal/sinks/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index fbde8999f8..ed6de87234 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -547,7 +547,7 @@ func (pgw *PostgresWriter) StartMaintenanceActivity(ctx context.Context, activit logger.Errorf("Starting transaction for %s maintenance failed: %w", activity, err) return false } - func() { + defer func() { if err != nil { _ = tx.Rollback(pgw.ctx) } else {