Skip to content

Commit fc728e1

Browse files
committed
Added notification triggers in database.
1 parent ebf6ca4 commit fc728e1

File tree

10 files changed

+546
-0
lines changed

10 files changed

+546
-0
lines changed
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
"""Test pgstac notification triggers."""
2+
import json
3+
import os
4+
import psycopg2
5+
import psycopg2.extensions
6+
import pytest
7+
import time
8+
from datetime import datetime, timezone
9+
10+
11+
@pytest.fixture(scope='session')
12+
def db_connection():
13+
"""Create database connection for testing."""
14+
# Use the same environment variables as the pgstac bootstrap
15+
connection_params = {
16+
'host': os.getenv('PGHOST', 'localhost'),
17+
'port': int(os.getenv('PGPORT', '5432')),
18+
'database': os.getenv('PGDATABASE', 'postgres'),
19+
'user': os.getenv('PGUSER', 'postgres'),
20+
'password': os.getenv('PGPASSWORD', 'password')
21+
}
22+
23+
conn = psycopg2.connect(**connection_params)
24+
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
25+
yield conn
26+
conn.close()
27+
28+
29+
@pytest.fixture
30+
def notification_listener(db_connection):
31+
"""Set up notification listener for pgstac_items_change."""
32+
cursor = db_connection.cursor()
33+
cursor.execute("LISTEN pgstac_items_change;")
34+
yield cursor
35+
cursor.execute("UNLISTEN pgstac_items_change;")
36+
cursor.close()
37+
38+
39+
def test_notification_triggers_exist(db_connection):
40+
"""Test that notification triggers and function are properly installed."""
41+
cursor = db_connection.cursor()
42+
43+
# Check that the notification function exists
44+
cursor.execute("""
45+
SELECT EXISTS(
46+
SELECT 1 FROM pg_proc
47+
WHERE proname = 'notify_items_change_func'
48+
);
49+
""")
50+
assert cursor.fetchone()[0], "notify_items_change_func function should exist"
51+
52+
# Check that all three triggers exist
53+
trigger_names = [
54+
'notify_items_change_insert',
55+
'notify_items_change_update',
56+
'notify_items_change_delete'
57+
]
58+
59+
for trigger_name in trigger_names:
60+
cursor.execute("""
61+
SELECT EXISTS(
62+
SELECT 1 FROM pg_trigger
63+
WHERE tgname = %s
64+
AND tgrelid = 'pgstac.items'::regclass
65+
);
66+
""", (trigger_name,))
67+
assert cursor.fetchone()[0], f"Trigger {trigger_name} should exist on pgstac.items"
68+
69+
cursor.close()
70+
71+
72+
def test_insert_notification(db_connection, notification_listener):
73+
"""Test that INSERT operations trigger notifications."""
74+
cursor = db_connection.cursor()
75+
76+
# Clear any pending notifications
77+
db_connection.poll()
78+
while db_connection.notifies:
79+
db_connection.notifies.pop(0)
80+
81+
# Use existing collection
82+
test_collection_id = "noaa-emergency-response"
83+
84+
# Insert a test item using pgstac.create_item
85+
test_item_id = f"test-item-{int(time.time())}"
86+
item_data = json.dumps({
87+
"id": test_item_id,
88+
"type": "Feature",
89+
"stac_version": "1.0.0",
90+
"collection": test_collection_id,
91+
"geometry": {"type": "Point", "coordinates": [0, 0]},
92+
"bbox": [0, 0, 0, 0],
93+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
94+
"assets": {}
95+
})
96+
97+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
98+
99+
# Wait for notification
100+
timeout = 5
101+
start_time = time.time()
102+
received_notification = False
103+
104+
while time.time() - start_time < timeout:
105+
db_connection.poll()
106+
if db_connection.notifies:
107+
notify = db_connection.notifies.pop(0)
108+
assert notify.channel == "pgstac_items_change"
109+
110+
# Parse the notification payload
111+
payload = json.loads(notify.payload)
112+
assert payload["operation"] == "INSERT"
113+
assert "items" in payload
114+
assert len(payload["items"]) == 1
115+
assert payload["items"][0]["id"] == test_item_id
116+
assert payload["items"][0]["collection"] == test_collection_id
117+
118+
received_notification = True
119+
break
120+
time.sleep(0.1)
121+
122+
assert received_notification, "Should have received INSERT notification"
123+
124+
# Cleanup
125+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
126+
cursor.close()
127+
128+
129+
def test_update_notification(db_connection, notification_listener):
130+
"""Test that UPDATE operations trigger notifications."""
131+
cursor = db_connection.cursor()
132+
133+
# Clear any pending notifications
134+
db_connection.poll()
135+
while db_connection.notifies:
136+
db_connection.notifies.pop(0)
137+
138+
test_collection_id = "noaa-emergency-response"
139+
140+
# Insert a test item first using pgstac.create_item
141+
test_item_id = f"test-item-update-{int(time.time())}"
142+
item_data = json.dumps({
143+
"id": test_item_id,
144+
"type": "Feature",
145+
"stac_version": "1.0.0",
146+
"collection": test_collection_id,
147+
"geometry": {"type": "Point", "coordinates": [0, 0]},
148+
"bbox": [0, 0, 0, 0],
149+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
150+
"assets": {}
151+
})
152+
153+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
154+
155+
# Clear INSERT notification
156+
db_connection.poll()
157+
while db_connection.notifies:
158+
db_connection.notifies.pop(0)
159+
160+
# Update the item using pgstac.update_item
161+
updated_item_data = json.dumps({
162+
"id": test_item_id,
163+
"type": "Feature",
164+
"stac_version": "1.0.0",
165+
"collection": test_collection_id,
166+
"geometry": {"type": "Point", "coordinates": [0, 0]},
167+
"bbox": [0, 0, 0, 0],
168+
"properties": {"datetime": "2020-01-01T00:00:00Z", "updated": True},
169+
"assets": {}
170+
})
171+
172+
cursor.execute("SELECT pgstac.update_item(%s);", (updated_item_data,))
173+
174+
# Wait for notification
175+
timeout = 5
176+
start_time = time.time()
177+
received_notification = False
178+
179+
while time.time() - start_time < timeout:
180+
db_connection.poll()
181+
if db_connection.notifies:
182+
notify = db_connection.notifies.pop(0)
183+
assert notify.channel == "pgstac_items_change"
184+
185+
# Parse the notification payload - PgSTAC update uses DELETE+INSERT, so accept both
186+
payload = json.loads(notify.payload)
187+
assert payload["operation"] in ["DELETE", "INSERT", "UPDATE"], f"Operation should be DELETE, INSERT, or UPDATE, got {payload['operation']}"
188+
assert "items" in payload
189+
assert len(payload["items"]) == 1
190+
assert payload["items"][0]["id"] == test_item_id
191+
assert payload["items"][0]["collection"] == test_collection_id
192+
193+
received_notification = True
194+
break
195+
time.sleep(0.1)
196+
197+
assert received_notification, "Should have received UPDATE notification"
198+
199+
# Cleanup
200+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
201+
cursor.close()
202+
203+
204+
def test_delete_notification(db_connection, notification_listener):
205+
"""Test that DELETE operations trigger notifications."""
206+
cursor = db_connection.cursor()
207+
208+
# Clear any pending notifications
209+
db_connection.poll()
210+
while db_connection.notifies:
211+
db_connection.notifies.pop(0)
212+
213+
test_collection_id = "noaa-emergency-response"
214+
215+
# Insert a test item first using pgstac.create_item
216+
test_item_id = f"test-item-delete-{int(time.time())}"
217+
item_data = json.dumps({
218+
"id": test_item_id,
219+
"type": "Feature",
220+
"stac_version": "1.0.0",
221+
"collection": test_collection_id,
222+
"geometry": {"type": "Point", "coordinates": [0, 0]},
223+
"bbox": [0, 0, 0, 0],
224+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
225+
"assets": {}
226+
})
227+
228+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
229+
230+
# Clear INSERT notification
231+
db_connection.poll()
232+
while db_connection.notifies:
233+
db_connection.notifies.pop(0)
234+
235+
# Delete the item using pgstac.delete_item
236+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
237+
238+
# Wait for notification
239+
timeout = 5
240+
start_time = time.time()
241+
received_notification = False
242+
243+
while time.time() - start_time < timeout:
244+
db_connection.poll()
245+
if db_connection.notifies:
246+
notify = db_connection.notifies.pop(0)
247+
assert notify.channel == "pgstac_items_change"
248+
249+
# Parse the notification payload
250+
payload = json.loads(notify.payload)
251+
assert payload["operation"] == "DELETE"
252+
assert "items" in payload
253+
assert len(payload["items"]) == 1
254+
assert payload["items"][0]["id"] == test_item_id
255+
assert payload["items"][0]["collection"] == test_collection_id
256+
257+
received_notification = True
258+
break
259+
time.sleep(0.1)
260+
261+
assert received_notification, "Should have received DELETE notification"
262+
cursor.close()
263+
264+
265+
def test_bulk_operations_notification(db_connection, notification_listener):
266+
"""Test that bulk operations send single notifications with multiple items."""
267+
cursor = db_connection.cursor()
268+
269+
# Clear any pending notifications
270+
db_connection.poll()
271+
while db_connection.notifies:
272+
db_connection.notifies.pop(0)
273+
274+
test_collection_id = "noaa-emergency-response"
275+
276+
# Insert multiple items using pgstac.create_item
277+
test_items = [f"bulk-item-{i}-{int(time.time())}" for i in range(3)]
278+
279+
for item_id in test_items:
280+
item_data = json.dumps({
281+
"id": item_id,
282+
"type": "Feature",
283+
"stac_version": "1.0.0",
284+
"collection": test_collection_id,
285+
"geometry": {"type": "Point", "coordinates": [0, 0]},
286+
"bbox": [0, 0, 0, 0],
287+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
288+
"assets": {}
289+
})
290+
291+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
292+
293+
# Wait for notifications (should get one per insert since we're doing separate statements)
294+
timeout = 10
295+
start_time = time.time()
296+
notifications_received = 0
297+
298+
while time.time() - start_time < timeout and notifications_received < len(test_items):
299+
db_connection.poll()
300+
while db_connection.notifies:
301+
notify = db_connection.notifies.pop(0)
302+
assert notify.channel == "pgstac_items_change"
303+
304+
payload = json.loads(notify.payload)
305+
assert payload["operation"] == "INSERT"
306+
assert "items" in payload
307+
notifications_received += len(payload["items"])
308+
309+
assert notifications_received >= len(test_items), f"Should have received notifications for all {len(test_items)} items"
310+
311+
# Cleanup
312+
for item_id in test_items:
313+
cursor.execute("SELECT pgstac.delete_item(%s);", (item_id,))
314+
315+
cursor.close()

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- Enforcement of `CHANGELOG.md` entries for PRs and Conventional Commits for PR titles [#288](https://github.com/developmentseed/eoapi-k8s/pull/288)
13+
- Notification triggers in database [#289](https://github.com/developmentseed/eoapi-k8s/pull/289)
1314

1415
## [0.7.8] - 2025-09-10
1516

charts/eoapi/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ A Helm chart for deploying Earth Observation APIs with integrated STAC, raster,
1212
- Multidimensional data support
1313
- Built-in STAC Browser interface
1414
- Flexible database configuration
15+
- Real-time PostgreSQL notifications for STAC item changes
1516
- Unified ingress system
1617

1718
## TL;DR
@@ -75,6 +76,7 @@ pgstacBootstrap:
7576
| `ingress.className` | Ingress controller class | `nginx` |
7677
| `browser.enabled` | Enable STAC Browser interface | `true` |
7778
| `pgstacBootstrap.enabled` | Enable database initialization | `true` |
79+
| `notifications.sources.pgstac` | Enable PostgreSQL notification triggers for STAC item changes | `false` |
7880

7981
Refer to the [values.schema.json](./values.schema.json) for the complete list of configurable parameters.
8082

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Create the notification function
2+
CREATE OR REPLACE FUNCTION notify_items_change_func()
3+
RETURNS TRIGGER AS $$
4+
DECLARE
5+
6+
BEGIN
7+
PERFORM pg_notify('pgstac_items_change'::text, json_build_object(
8+
'operation', TG_OP,
9+
'items', jsonb_agg(
10+
jsonb_build_object(
11+
'collection', data.collection,
12+
'id', data.id
13+
)
14+
)
15+
)::text
16+
)
17+
FROM data
18+
;
19+
RETURN NULL;
20+
END;
21+
$$ LANGUAGE plpgsql;
22+
23+
-- Create triggers for INSERT operations
24+
CREATE OR REPLACE TRIGGER notify_items_change_insert
25+
AFTER INSERT ON pgstac.items
26+
REFERENCING NEW TABLE AS data
27+
FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func()
28+
;
29+
30+
-- Create triggers for UPDATE operations
31+
CREATE OR REPLACE TRIGGER notify_items_change_update
32+
AFTER UPDATE ON pgstac.items
33+
REFERENCING NEW TABLE AS data
34+
FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func()
35+
;
36+
37+
-- Create triggers for DELETE operations
38+
CREATE OR REPLACE TRIGGER notify_items_change_delete
39+
AFTER DELETE ON pgstac.items
40+
REFERENCING OLD TABLE AS data
41+
FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func()
42+
;

charts/eoapi/initdb-data/settings/pgstac-settings.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ INSERT INTO pgstac.pgstac_settings (name, value) VALUES ('context_estimated_cost
1010

1111
DELETE FROM pgstac.pgstac_settings WHERE name = 'context_stats_ttl';
1212
INSERT INTO pgstac.pgstac_settings (name, value) VALUES ('context_stats_ttl', '1 day');
13+
14+

charts/eoapi/templates/pgstacbootstrap/configmap.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ metadata:
1414
data:
1515
pgstac-settings.sql: |
1616
{{ $.Files.Get "initdb-data/settings/pgstac-settings.sql" | nindent 4 }}
17+
{{- if .Values.notifications.sources.pgstac }}
18+
{{ $.Files.Get "initdb-data/settings/pgstac-notification-triggers.sql" | nindent 4 }}
19+
{{- end }}
1720
---
1821
{{- if .Values.pgstacBootstrap.settings.loadSamples }}
1922
apiVersion: v1

0 commit comments

Comments
 (0)