Skip to content

Commit 45a5e83

Browse files
committed
Added notification triggers in database.
1 parent 84d6a35 commit 45a5e83

File tree

9 files changed

+623
-0
lines changed

9 files changed

+623
-0
lines changed
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
"""Test pgstac notification triggers."""
2+
import json
3+
import os
4+
import psycopg2
5+
import psycopg2.extensions
6+
import pytest
7+
import time
8+
import subprocess
9+
from datetime import datetime, timezone
10+
11+
12+
@pytest.fixture(scope='session')
13+
def db_connection():
14+
"""Create database connection for testing."""
15+
# Use the same environment variables as the pgstac bootstrap
16+
connection_params = {
17+
'host': os.getenv('PGHOST', 'localhost'),
18+
'port': int(os.getenv('PGPORT', '5432')),
19+
'database': os.getenv('PGDATABASE', 'postgres'),
20+
'user': os.getenv('PGUSER', 'postgres'),
21+
'password': os.getenv('PGPASSWORD', 'password')
22+
}
23+
24+
conn = psycopg2.connect(**connection_params)
25+
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
26+
yield conn
27+
conn.close()
28+
29+
30+
@pytest.fixture(scope='session')
31+
def notifications_enabled():
32+
"""Check if notifications are enabled in the deployment config by checking Helm values."""
33+
try:
34+
# Get release name from environment or default
35+
release_name = os.getenv('RELEASE_NAME', 'eoapi')
36+
namespace = os.getenv('NAMESPACE', 'eoapi')
37+
38+
# Check if notifications are enabled in Helm values
39+
result = subprocess.run([
40+
'helm', 'get', 'values', release_name,
41+
'-n', namespace,
42+
'-o', 'json'
43+
], capture_output=True, text=True, check=True)
44+
45+
# Parse JSON and check notifications.sources.pgstac value
46+
values = json.loads(result.stdout)
47+
return values.get('notifications', {}).get('sources', {}).get('pgstac', False)
48+
except (subprocess.CalledProcessError, json.JSONDecodeError, Exception):
49+
# If we can't check the Helm values, assume notifications are disabled
50+
return False
51+
52+
53+
@pytest.fixture
54+
def notification_listener(db_connection):
55+
"""Set up notification listener for pgstac_items_change."""
56+
cursor = db_connection.cursor()
57+
cursor.execute("LISTEN pgstac_items_change;")
58+
yield cursor
59+
cursor.execute("UNLISTEN pgstac_items_change;")
60+
cursor.close()
61+
62+
63+
def test_notification_triggers_exist(db_connection, notifications_enabled):
64+
"""Test that notification triggers and function are properly installed."""
65+
if not notifications_enabled:
66+
pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test")
67+
68+
cursor = db_connection.cursor()
69+
70+
# Check that the notification function exists
71+
cursor.execute("""
72+
SELECT EXISTS(
73+
SELECT 1 FROM pg_proc
74+
WHERE proname = 'notify_items_change_func'
75+
);
76+
""")
77+
assert cursor.fetchone()[0], "notify_items_change_func function should exist"
78+
79+
# Check that all three triggers exist
80+
trigger_names = [
81+
'notify_items_change_insert',
82+
'notify_items_change_update',
83+
'notify_items_change_delete'
84+
]
85+
86+
for trigger_name in trigger_names:
87+
cursor.execute("""
88+
SELECT EXISTS(
89+
SELECT 1 FROM pg_trigger
90+
WHERE tgname = %s
91+
AND tgrelid = 'pgstac.items'::regclass
92+
);
93+
""", (trigger_name,))
94+
assert cursor.fetchone()[0], f"Trigger {trigger_name} should exist on pgstac.items"
95+
96+
cursor.close()
97+
98+
99+
def test_insert_notification(db_connection, notification_listener, notifications_enabled):
100+
"""Test that INSERT operations trigger notifications."""
101+
if not notifications_enabled:
102+
pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test")
103+
104+
cursor = db_connection.cursor()
105+
106+
# Clear any pending notifications
107+
db_connection.poll()
108+
while db_connection.notifies:
109+
db_connection.notifies.pop(0)
110+
111+
# Use existing collection
112+
test_collection_id = "noaa-emergency-response"
113+
114+
# Insert a test item using pgstac.create_item
115+
test_item_id = f"test-item-{int(time.time())}"
116+
item_data = json.dumps({
117+
"id": test_item_id,
118+
"type": "Feature",
119+
"stac_version": "1.0.0",
120+
"collection": test_collection_id,
121+
"geometry": {"type": "Point", "coordinates": [0, 0]},
122+
"bbox": [0, 0, 0, 0],
123+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
124+
"assets": {}
125+
})
126+
127+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
128+
129+
# Wait for notification
130+
timeout = 5
131+
start_time = time.time()
132+
received_notification = False
133+
134+
while time.time() - start_time < timeout:
135+
db_connection.poll()
136+
if db_connection.notifies:
137+
notify = db_connection.notifies.pop(0)
138+
assert notify.channel == "pgstac_items_change"
139+
140+
# Parse the notification payload
141+
payload = json.loads(notify.payload)
142+
assert payload["operation"] == "INSERT"
143+
assert "items" in payload
144+
assert len(payload["items"]) == 1
145+
assert payload["items"][0]["id"] == test_item_id
146+
assert payload["items"][0]["collection"] == test_collection_id
147+
148+
received_notification = True
149+
break
150+
time.sleep(0.1)
151+
152+
assert received_notification, "Should have received INSERT notification"
153+
154+
# Cleanup
155+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
156+
cursor.close()
157+
158+
159+
def test_update_notification(db_connection, notification_listener, notifications_enabled):
160+
"""Test that UPDATE operations trigger notifications."""
161+
if not notifications_enabled:
162+
pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test")
163+
164+
cursor = db_connection.cursor()
165+
166+
# Clear any pending notifications
167+
db_connection.poll()
168+
while db_connection.notifies:
169+
db_connection.notifies.pop(0)
170+
171+
test_collection_id = "noaa-emergency-response"
172+
173+
# Insert a test item first using pgstac.create_item
174+
test_item_id = f"test-item-update-{int(time.time())}"
175+
item_data = json.dumps({
176+
"id": test_item_id,
177+
"type": "Feature",
178+
"stac_version": "1.0.0",
179+
"collection": test_collection_id,
180+
"geometry": {"type": "Point", "coordinates": [0, 0]},
181+
"bbox": [0, 0, 0, 0],
182+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
183+
"assets": {}
184+
})
185+
186+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
187+
188+
# Clear INSERT notification
189+
db_connection.poll()
190+
while db_connection.notifies:
191+
db_connection.notifies.pop(0)
192+
193+
# Update the item using pgstac.update_item
194+
updated_item_data = json.dumps({
195+
"id": test_item_id,
196+
"type": "Feature",
197+
"stac_version": "1.0.0",
198+
"collection": test_collection_id,
199+
"geometry": {"type": "Point", "coordinates": [0, 0]},
200+
"bbox": [0, 0, 0, 0],
201+
"properties": {"datetime": "2020-01-01T00:00:00Z", "updated": True},
202+
"assets": {}
203+
})
204+
205+
cursor.execute("SELECT pgstac.update_item(%s);", (updated_item_data,))
206+
207+
# Wait for notification
208+
timeout = 5
209+
start_time = time.time()
210+
received_notification = False
211+
212+
while time.time() - start_time < timeout:
213+
db_connection.poll()
214+
if db_connection.notifies:
215+
notify = db_connection.notifies.pop(0)
216+
assert notify.channel == "pgstac_items_change"
217+
218+
# Parse the notification payload - PgSTAC update uses DELETE+INSERT, so accept both
219+
payload = json.loads(notify.payload)
220+
assert payload["operation"] in ["DELETE", "INSERT", "UPDATE"], f"Operation should be DELETE, INSERT, or UPDATE, got {payload['operation']}"
221+
assert "items" in payload
222+
assert len(payload["items"]) == 1
223+
assert payload["items"][0]["id"] == test_item_id
224+
assert payload["items"][0]["collection"] == test_collection_id
225+
226+
received_notification = True
227+
break
228+
time.sleep(0.1)
229+
230+
assert received_notification, "Should have received UPDATE notification"
231+
232+
# Cleanup
233+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
234+
cursor.close()
235+
236+
237+
def test_delete_notification(db_connection, notification_listener, notifications_enabled):
238+
"""Test that DELETE operations trigger notifications."""
239+
if not notifications_enabled:
240+
pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test")
241+
242+
cursor = db_connection.cursor()
243+
244+
# Clear any pending notifications
245+
db_connection.poll()
246+
while db_connection.notifies:
247+
db_connection.notifies.pop(0)
248+
249+
test_collection_id = "noaa-emergency-response"
250+
251+
# Insert a test item first using pgstac.create_item
252+
test_item_id = f"test-item-delete-{int(time.time())}"
253+
item_data = json.dumps({
254+
"id": test_item_id,
255+
"type": "Feature",
256+
"stac_version": "1.0.0",
257+
"collection": test_collection_id,
258+
"geometry": {"type": "Point", "coordinates": [0, 0]},
259+
"bbox": [0, 0, 0, 0],
260+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
261+
"assets": {}
262+
})
263+
264+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
265+
266+
# Clear INSERT notification
267+
db_connection.poll()
268+
while db_connection.notifies:
269+
db_connection.notifies.pop(0)
270+
271+
# Delete the item using pgstac.delete_item
272+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
273+
274+
# Wait for notification
275+
timeout = 5
276+
start_time = time.time()
277+
received_notification = False
278+
279+
while time.time() - start_time < timeout:
280+
db_connection.poll()
281+
if db_connection.notifies:
282+
notify = db_connection.notifies.pop(0)
283+
assert notify.channel == "pgstac_items_change"
284+
285+
# Parse the notification payload
286+
payload = json.loads(notify.payload)
287+
assert payload["operation"] == "DELETE"
288+
assert "items" in payload
289+
assert len(payload["items"]) == 1
290+
assert payload["items"][0]["id"] == test_item_id
291+
assert payload["items"][0]["collection"] == test_collection_id
292+
293+
received_notification = True
294+
break
295+
time.sleep(0.1)
296+
297+
assert received_notification, "Should have received DELETE notification"
298+
cursor.close()
299+
300+
301+
def test_bulk_operations_notification(db_connection, notification_listener, notifications_enabled):
302+
"""Test that bulk operations send notifications with multiple items."""
303+
if not notifications_enabled:
304+
pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test")
305+
306+
cursor = db_connection.cursor()
307+
308+
# Clear any pending notifications
309+
db_connection.poll()
310+
while db_connection.notifies:
311+
db_connection.notifies.pop(0)
312+
313+
test_collection_id = "noaa-emergency-response"
314+
315+
# Insert multiple items using pgstac.create_item
316+
test_items = [f"bulk-item-{i}-{int(time.time())}" for i in range(3)]
317+
318+
for item_id in test_items:
319+
item_data = json.dumps({
320+
"id": item_id,
321+
"type": "Feature",
322+
"stac_version": "1.0.0",
323+
"collection": test_collection_id,
324+
"geometry": {"type": "Point", "coordinates": [0, 0]},
325+
"bbox": [0, 0, 0, 0],
326+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
327+
"assets": {}
328+
})
329+
330+
cursor.execute("SELECT pgstac.create_item(%s);", (item_data,))
331+
332+
# Wait for notifications (should get one per insert since we're doing separate statements)
333+
timeout = 10
334+
start_time = time.time()
335+
notifications_received = 0
336+
337+
while time.time() - start_time < timeout and notifications_received < len(test_items):
338+
db_connection.poll()
339+
while db_connection.notifies:
340+
notify = db_connection.notifies.pop(0)
341+
assert notify.channel == "pgstac_items_change"
342+
343+
payload = json.loads(notify.payload)
344+
assert payload["operation"] == "INSERT"
345+
assert "items" in payload
346+
notifications_received += len(payload["items"])
347+
348+
assert notifications_received >= len(test_items), f"Should have received notifications for all {len(test_items)} items"
349+
350+
# Cleanup
351+
for item_id in test_items:
352+
cursor.execute("SELECT pgstac.delete_item(%s);", (item_id,))
353+
354+
cursor.close()

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
- Enforcement of `CHANGELOG.md` entries for PRs and Conventional Commits for PR titles [#288](https://github.com/developmentseed/eoapi-k8s/pull/288)
1313
- Added code formatting and linting with pre-commit hooks [#283](https://github.com/developmentseed/eoapi-k8s/pull/283)
1414
- Adjusted Renovate Configuration to fit conventional commits [#295](https://github.com/developmentseed/eoapi-k8s/pull/295)
15+
- Notification triggers in database [#289](https://github.com/developmentseed/eoapi-k8s/pull/289)
1516

1617
## [0.7.8] - 2025-09-10
1718

charts/eoapi/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ A Helm chart for deploying Earth Observation APIs with integrated STAC, raster,
1212
- Multidimensional data support
1313
- Built-in STAC Browser interface
1414
- Flexible database configuration
15+
- Real-time PostgreSQL notifications for STAC item changes
1516
- Unified ingress system
1617

1718
## TL;DR
@@ -75,6 +76,7 @@ pgstacBootstrap:
7576
| `ingress.className` | Ingress controller class | `nginx` |
7677
| `browser.enabled` | Enable STAC Browser interface | `true` |
7778
| `pgstacBootstrap.enabled` | Enable database initialization | `true` |
79+
| `notifications.sources.pgstac` | Enable PostgreSQL notification triggers for STAC item changes | `false` |
7880

7981
Refer to the [values.schema.json](./values.schema.json) for the complete list of configurable parameters.
8082

0 commit comments

Comments
 (0)