Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions internal/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ type MeasurementMessagePostgres struct {
TagData map[string]string
}

const sourcesMaintenanceInterval = time.Hour * 24

type DbStorageSchemaType int

const (
Expand Down Expand Up @@ -496,6 +498,10 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
}

for {
if !pgw.StartMaintenanceActivity(pgw.ctx, "partitions_maintenance") {
continue
}

if pgw.metricSchema == DbStorageSchemaTimescale {
partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
if err != nil {
Expand Down Expand Up @@ -534,40 +540,76 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
}
}

func (pgw *PostgresWriter) StartMaintenanceActivity(ctx context.Context, activity string) bool {
logger := log.GetLogger(ctx)
tx, err := pgw.sinkDb.Begin(pgw.ctx)
if err != nil {
logger.Errorf("Starting transaction for %s maintenance failed: %w", activity, err)
return false
}
defer func() {
if err != nil {
_ = tx.Rollback(pgw.ctx)
} else {
_ = tx.Commit(pgw.ctx)
}
_, _ = pgw.sinkDb.Exec(pgw.ctx, `SELECT pg_advisory_unlock(1571543679778230000)`)
}()

var check bool
if err = tx.QueryRow(pgw.ctx, `SELECT now() - value::timestamptz > $1::interval FROM admin.config WHERE key = $2`,
sourcesMaintenanceInterval, activity).Scan(&check); err != nil {
logger.Errorf("Checking partition maintenance interval failed: %w", err)
return false
}
if !check {
logger.Infof("Skipping %s maintenance as it was handled in the last 24 hours...", activity)
return false
}

var lock bool
logger.Infof("Trying to get %s maintenance advisory lock...", activity)
if err = tx.QueryRow(pgw.ctx, `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`).Scan(&lock); err != nil {
logger.Error(err)
return false
}
if !lock {
logger.Infof("Skipping %s maintenance as another instance has the advisory lock...", activity)
return false
}

if _, err = tx.Exec(pgw.ctx, `UPDATE admin.config SET value = now()::text WHERE key = $1`, activity); err != nil {
logger.Error(err)
return false
}
return true
}

// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) maintainUniqueSources() {
logger := log.GetLogger(pgw.ctx)
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
sqlDistinct := `
WITH RECURSIVE t(dbname) AS (
sqlDistinct := `WITH RECURSIVE t(dbname) AS (
SELECT MIN(dbname) AS dbname FROM %s
UNION
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
sqlAdd := `
INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
sqlAdd := `INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
RETURNING *`

for {
select {
case <-pgw.ctx.Done():
return
case <-time.After(time.Hour * 24):
case <-time.After(sourcesMaintenanceInterval):
}
var lock bool
logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
continue
}
if !lock {
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")

if !pgw.StartMaintenanceActivity(pgw.ctx, "sources_maintenance") {
continue
}

Expand All @@ -587,7 +629,6 @@ func (pgw *PostgresWriter) maintainUniqueSources() {
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
break
Expand Down
24 changes: 6 additions & 18 deletions internal/sinks/sql/admin_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@ create schema "subpartitions";

create extension if not exists btree_gin;

-- grant all on schema public to pgwatch;

-- do $sql$
-- begin
-- execute format($$alter role pgwatch in database %s set statement_timeout to '5min'$$, current_database());
-- raise warning 'Enabling asynchronous commit for pgwatch role - revert if possible data loss on crash is not acceptable!';
-- execute format($$alter role pgwatch in database %s set synchronous_commit to off$$, current_database());
-- end
-- $sql$;

-- set role to pgwatch;


create function admin.get_default_storage_type() returns text as
$$
select case
Expand Down Expand Up @@ -64,8 +51,12 @@ create table admin.config

-- to later change the value call the admin.change_timescale_chunk_interval(interval) function!
-- as changing the row directly will only be effective for completely new tables (metrics).
insert into admin.config select 'timescale_chunk_interval', '2 days';
insert into admin.config select 'timescale_compress_interval', '1 day';
insert into admin.config values
('timescale_chunk_interval', '2 days'),
('timescale_compress_interval', '1 day'),
('partitions_maintenance', now()::text),
('sources_maintenance', now()::text);


create or replace function trg_config_modified() returns trigger
as $$
Expand Down Expand Up @@ -153,6 +144,3 @@ COMMENT ON TABLE admin.metrics_template_realtime IS 'used as a template for all

-- create index on admin.metrics_template using brin (dbname, time) with (pages_per_range=32); /* consider BRIN instead for large data amounts */
CREATE INDEX ON admin.metrics_template_realtime (dbname, time);

-- RESET ROLE;

Loading