diff --git a/.github/workflows/helm-tests.yml b/.github/workflows/helm-tests.yml index 337d792c..2af833fc 100644 --- a/.github/workflows/helm-tests.yml +++ b/.github/workflows/helm-tests.yml @@ -112,6 +112,11 @@ jobs: echo "===== Related Kubernetes Events =====" kubectl get events | grep -E "pgstac|initdb" || echo "No relevant events found" + # Check notification system status + echo "===== Notification System Status =====" + kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment found" + kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink found" + exit 1 - name: Run integration tests @@ -130,6 +135,19 @@ jobs: kubectl get services -o wide kubectl get ingress + # Check notification system final status + echo "=== Notification System Final Status ===" + kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment" + kubectl get pods -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier pods" + kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink" + kubectl get pods -l serving.knative.dev/service -o wide || echo "No Knative CloudEvents sink pods" + + # Show notification logs if they exist + echo "=== eoapi-notifier Logs ===" + kubectl logs -l app.kubernetes.io/name=eoapi-notifier --tail=20 || echo "No eoapi-notifier logs" + echo "=== Knative CloudEvents Sink Logs ===" + kubectl logs -l serving.knative.dev/service --tail=20 || echo "No Knative CloudEvents sink logs" + - name: Cleanup if: always() diff --git a/.github/workflows/tests/conftest.py b/.github/workflows/tests/conftest.py index 8179e62f..2f384328 100644 --- a/.github/workflows/tests/conftest.py +++ b/.github/workflows/tests/conftest.py @@ -1,5 +1,7 @@ import pytest import os +import psycopg2 +import psycopg2.extensions @pytest.fixture(scope='session') @@ -15,3 +17,30 @@ def vector_endpoint(): @pytest.fixture(scope='session') def stac_endpoint(): return os.getenv('STAC_ENDPOINT', "http://127.0.0.1/stac") + + +@pytest.fixture(scope='session') +def db_connection(): + """Create database connection for testing.""" + # Require all database connection parameters to be explicitly set + required_vars = ['PGHOST', 'PGPORT', 'PGDATABASE', 'PGUSER', 'PGPASSWORD'] + missing_vars = [var for var in required_vars if not os.getenv(var)] + + if missing_vars: + pytest.fail(f"Required environment variables not set: {', '.join(missing_vars)}") + + connection_params = { + 'host': os.getenv('PGHOST'), + 'port': int(os.getenv('PGPORT')), + 'database': os.getenv('PGDATABASE'), + 'user': os.getenv('PGUSER'), + 'password': os.getenv('PGPASSWORD') + } + + try: + conn = psycopg2.connect(**connection_params) + conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + yield conn + conn.close() + except psycopg2.Error as e: + pytest.fail(f"Cannot connect to database: {e}") diff --git a/.github/workflows/tests/test_notifications.py b/.github/workflows/tests/test_notifications.py new file mode 100644 index 00000000..10794d95 --- /dev/null +++ b/.github/workflows/tests/test_notifications.py @@ -0,0 +1,211 @@ +"""Test notification system deployment and functionality.""" +import json +import os +import psycopg2 +import psycopg2.extensions +import requests +import subprocess +import time +import pytest +from datetime import datetime + + + + +def test_eoapi_notifier_deployment(): + """Test that eoapi-notifier deployment is running.""" + # Check if eoapi-notifier deployment exists and is ready + result = subprocess.run([ + 'kubectl', 'get', 'deployment', + '-l', 'app.kubernetes.io/name=eoapi-notifier', + '-n', 'eoapi', + '--no-headers', '-o', 'custom-columns=READY:.status.readyReplicas' + ], capture_output=True, text=True) + + if result.returncode != 0: + pytest.skip("eoapi-notifier deployment not found - notifications not enabled") + + ready_replicas = result.stdout.strip() + assert ready_replicas == "1", f"Expected 1 ready replica, got {ready_replicas}" + + +def test_cloudevents_sink_exists(): + """Test that Knative CloudEvents sink service exists and is accessible.""" + # Check if Knative service exists + result = subprocess.run([ + 'kubectl', 'get', 'ksvc', + '-l', 'app.kubernetes.io/component=cloudevents-sink', + '--no-headers' + ], capture_output=True, text=True) + + if result.returncode != 0 or not result.stdout.strip(): + pytest.skip("Knative CloudEvents sink not found - notifications not configured") + + assert "cloudevents-sink" in result.stdout, "Knative CloudEvents sink should exist" + + +def test_notification_configuration(): + """Test that eoapi-notifier is configured correctly.""" + # Get the configmap for eoapi-notifier + result = subprocess.run([ + 'kubectl', 'get', 'configmap', + '-l', 'app.kubernetes.io/name=eoapi-notifier', + '-o', r'jsonpath={.items[0].data.config\.yaml}' + ], capture_output=True, text=True) + + if result.returncode != 0: + pytest.skip("eoapi-notifier configmap not found") + + config_yaml = result.stdout.strip() + assert "postgres" in config_yaml, "Should have postgres source configured" + assert "cloudevents" in config_yaml, "Should have cloudevents output configured" + assert "pgstac_items_change" in config_yaml, "Should listen to pgstac_items_change channel" + + +def test_cloudevents_sink_logs_show_startup(): + """Test that Knative CloudEvents sink started successfully.""" + # Get Knative CloudEvents sink pod logs + result = subprocess.run([ + 'kubectl', 'logs', + '-l', 'serving.knative.dev/service', + '-n', 'eoapi', + '--tail=20' + ], capture_output=True, text=True) + + if result.returncode != 0: + pytest.skip("Cannot get Knative CloudEvents sink logs") + + logs = result.stdout + assert "listening on port" in logs, "Knative CloudEvents sink should have started successfully" + + +def test_eoapi_notifier_logs_show_connection(): + """Test that eoapi-notifier connects to database successfully.""" + # Give some time for the notifier to start + time.sleep(5) + + # Get eoapi-notifier pod logs + result = subprocess.run([ + 'kubectl', 'logs', + '-l', 'app.kubernetes.io/name=eoapi-notifier', + '--tail=50' + ], capture_output=True, text=True) + + if result.returncode != 0: + pytest.skip("Cannot get eoapi-notifier logs") + + logs = result.stdout + # Should not have connection errors + assert "Connection refused" not in logs, "Should not have connection errors" + assert "Authentication failed" not in logs, "Should not have auth errors" + + +def test_database_notification_triggers_exist(db_connection): + """Test that pgstac notification triggers are installed.""" + with db_connection.cursor() as cur: + # Check if the notification function exists + cur.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE n.nspname = 'public' + AND p.proname = 'notify_items_change_func' + ); + """) + result = cur.fetchone() + function_exists = result[0] if result else False + assert function_exists, "notify_items_change_func should exist" + + # Check if triggers exist + cur.execute(""" + SELECT COUNT(*) FROM information_schema.triggers + WHERE trigger_name LIKE 'notify_items_change_%' + AND event_object_table = 'items' + AND event_object_schema = 'pgstac'; + """) + result = cur.fetchone() + trigger_count = result[0] if result else 0 + assert trigger_count >= 3, f"Should have at least 3 triggers (INSERT, UPDATE, DELETE), found {trigger_count}" + + + + +def test_end_to_end_notification_flow(db_connection): + """Test complete flow: database → eoapi-notifier → Knative CloudEvents sink.""" + + # Skip if notifications not enabled + if not subprocess.run(['kubectl', 'get', 'deployment', '-l', 'app.kubernetes.io/name=eoapi-notifier', '--no-headers'], capture_output=True).stdout.strip(): + pytest.skip("eoapi-notifier not deployed") + + # Find Knative CloudEvents sink pod + result = subprocess.run(['kubectl', 'get', 'pods', '-l', 'serving.knative.dev/service', '-o', 'jsonpath={.items[0].metadata.name}'], capture_output=True, text=True) + + if result.returncode != 0 or not result.stdout.strip(): + pytest.skip("Knative CloudEvents sink pod not found") + + sink_pod = result.stdout.strip() + + # Insert test item and check for CloudEvent + test_item_id = f"e2e-test-{int(time.time())}" + try: + with db_connection.cursor() as cursor: + cursor.execute("SELECT pgstac.create_item(%s);", (json.dumps({ + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": "noaa-emergency-response", + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {} + }),)) + + # Check CloudEvents sink logs for CloudEvent + found_event = False + for _ in range(20): # 20 second timeout + time.sleep(1) + result = subprocess.run(['kubectl', 'logs', sink_pod, '--since=30s'], capture_output=True, text=True) + if result.returncode == 0 and "CloudEvent received" in result.stdout and test_item_id in result.stdout: + found_event = True + break + + assert found_event, f"CloudEvent for {test_item_id} not received by CloudEvents sink" + + finally: + # Cleanup + with db_connection.cursor() as cursor: + cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,)) + + +def test_k_sink_injection(): + """Test that SinkBinding injects K_SINK into eoapi-notifier deployment.""" + # Check if eoapi-notifier deployment exists + result = subprocess.run([ + 'kubectl', 'get', 'deployment', + '-l', 'app.kubernetes.io/name=eoapi-notifier', + '-o', 'jsonpath={.items[0].spec.template.spec.containers[0].env[?(@.name=="K_SINK")].value}' + ], capture_output=True, text=True) + + if result.returncode != 0: + pytest.skip("eoapi-notifier deployment not found") + + k_sink_value = result.stdout.strip() + if k_sink_value: + assert "cloudevents-sink" in k_sink_value, f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}" + print(f"✅ K_SINK properly injected: {k_sink_value}") + else: + # Check if SinkBinding exists - it may take time to inject + sinkbinding_result = subprocess.run([ + 'kubectl', 'get', 'sinkbinding', + '-l', 'app.kubernetes.io/component=sink-binding', + '--no-headers' + ], capture_output=True, text=True) + + if sinkbinding_result.returncode == 0 and sinkbinding_result.stdout.strip(): + pytest.skip("SinkBinding exists but K_SINK not yet injected - may need more time") + else: + pytest.fail("No K_SINK found and no SinkBinding exists") + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/.github/workflows/tests/test_pgstac_notifications.py b/.github/workflows/tests/test_pgstac_notifications.py new file mode 100644 index 00000000..2a1ab5f2 --- /dev/null +++ b/.github/workflows/tests/test_pgstac_notifications.py @@ -0,0 +1,338 @@ +"""Test pgstac notification triggers.""" +import json +import os +import psycopg2 +import psycopg2.extensions +import pytest +import time +import subprocess +from datetime import datetime, timezone + + + + +@pytest.fixture(scope='session') +def notifications_enabled(): + """Check if notifications are enabled in the deployment config by checking Helm values.""" + try: + # Get release name from environment or default + release_name = os.getenv('RELEASE_NAME', 'eoapi') + namespace = os.getenv('NAMESPACE', 'eoapi') + + # Check if notifications are enabled in Helm values + result = subprocess.run([ + 'helm', 'get', 'values', release_name, + '-n', namespace, + '-o', 'json' + ], capture_output=True, text=True, check=True) + + # Parse JSON and check notifications.sources.pgstac value + values = json.loads(result.stdout) + return values.get('notifications', {}).get('sources', {}).get('pgstac', False) + except (subprocess.CalledProcessError, json.JSONDecodeError, Exception): + # If we can't check the Helm values, assume notifications are disabled + return False + + +@pytest.fixture +def notification_listener(db_connection): + """Set up notification listener for pgstac_items_change.""" + cursor = db_connection.cursor() + cursor.execute("LISTEN pgstac_items_change;") + yield cursor + cursor.execute("UNLISTEN pgstac_items_change;") + cursor.close() + + +def test_notification_triggers_exist(db_connection, notifications_enabled): + """Test that notification triggers and function are properly installed.""" + if not notifications_enabled: + pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + + cursor = db_connection.cursor() + + # Check that the notification function exists + cursor.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_proc + WHERE proname = 'notify_items_change_func' + ); + """) + assert cursor.fetchone()[0], "notify_items_change_func function should exist" + + # Check that all three triggers exist + trigger_names = [ + 'notify_items_change_insert', + 'notify_items_change_update', + 'notify_items_change_delete' + ] + + for trigger_name in trigger_names: + cursor.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_trigger + WHERE tgname = %s + AND tgrelid = 'pgstac.items'::regclass + ); + """, (trigger_name,)) + assert cursor.fetchone()[0], f"Trigger {trigger_name} should exist on pgstac.items" + + cursor.close() + + +def test_insert_notification(db_connection, notification_listener, notifications_enabled): + """Test that INSERT operations trigger notifications.""" + if not notifications_enabled: + pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + + cursor = db_connection.cursor() + + # Clear any pending notifications + db_connection.poll() + while db_connection.notifies: + db_connection.notifies.pop(0) + + # Use existing collection + test_collection_id = "noaa-emergency-response" + + # Insert a test item using pgstac.create_item + test_item_id = f"test-item-{int(time.time())}" + item_data = json.dumps({ + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {} + }) + + cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) + + # Wait for notification + timeout = 5 + start_time = time.time() + received_notification = False + + while time.time() - start_time < timeout: + db_connection.poll() + if db_connection.notifies: + notify = db_connection.notifies.pop(0) + assert notify.channel == "pgstac_items_change" + + # Parse the notification payload + payload = json.loads(notify.payload) + assert payload["operation"] == "INSERT" + assert "items" in payload + assert len(payload["items"]) == 1 + assert payload["items"][0]["id"] == test_item_id + assert payload["items"][0]["collection"] == test_collection_id + + received_notification = True + break + time.sleep(0.1) + + assert received_notification, "Should have received INSERT notification" + + # Cleanup + cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,)) + cursor.close() + + +def test_update_notification(db_connection, notification_listener, notifications_enabled): + """Test that UPDATE operations trigger notifications.""" + if not notifications_enabled: + pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + + cursor = db_connection.cursor() + + # Clear any pending notifications + db_connection.poll() + while db_connection.notifies: + db_connection.notifies.pop(0) + + test_collection_id = "noaa-emergency-response" + + # Insert a test item first using pgstac.create_item + test_item_id = f"test-item-update-{int(time.time())}" + item_data = json.dumps({ + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {} + }) + + cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) + + # Clear INSERT notification + db_connection.poll() + while db_connection.notifies: + db_connection.notifies.pop(0) + + # Update the item using pgstac.update_item + updated_item_data = json.dumps({ + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z", "updated": True}, + "assets": {} + }) + + cursor.execute("SELECT pgstac.update_item(%s);", (updated_item_data,)) + + # Wait for notification + timeout = 5 + start_time = time.time() + received_notification = False + + while time.time() - start_time < timeout: + db_connection.poll() + if db_connection.notifies: + notify = db_connection.notifies.pop(0) + assert notify.channel == "pgstac_items_change" + + # Parse the notification payload - PgSTAC update uses DELETE+INSERT, so accept both + payload = json.loads(notify.payload) + assert payload["operation"] in ["DELETE", "INSERT", "UPDATE"], f"Operation should be DELETE, INSERT, or UPDATE, got {payload['operation']}" + assert "items" in payload + assert len(payload["items"]) == 1 + assert payload["items"][0]["id"] == test_item_id + assert payload["items"][0]["collection"] == test_collection_id + + received_notification = True + break + time.sleep(0.1) + + assert received_notification, "Should have received UPDATE notification" + + # Cleanup + cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,)) + cursor.close() + + +def test_delete_notification(db_connection, notification_listener, notifications_enabled): + """Test that DELETE operations trigger notifications.""" + if not notifications_enabled: + pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + + cursor = db_connection.cursor() + + # Clear any pending notifications + db_connection.poll() + while db_connection.notifies: + db_connection.notifies.pop(0) + + test_collection_id = "noaa-emergency-response" + + # Insert a test item first using pgstac.create_item + test_item_id = f"test-item-delete-{int(time.time())}" + item_data = json.dumps({ + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {} + }) + + cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) + + # Clear INSERT notification + db_connection.poll() + while db_connection.notifies: + db_connection.notifies.pop(0) + + # Delete the item using pgstac.delete_item + cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,)) + + # Wait for notification + timeout = 5 + start_time = time.time() + received_notification = False + + while time.time() - start_time < timeout: + db_connection.poll() + if db_connection.notifies: + notify = db_connection.notifies.pop(0) + assert notify.channel == "pgstac_items_change" + + # Parse the notification payload + payload = json.loads(notify.payload) + assert payload["operation"] == "DELETE" + assert "items" in payload + assert len(payload["items"]) == 1 + assert payload["items"][0]["id"] == test_item_id + assert payload["items"][0]["collection"] == test_collection_id + + received_notification = True + break + time.sleep(0.1) + + assert received_notification, "Should have received DELETE notification" + cursor.close() + + +def test_bulk_operations_notification(db_connection, notification_listener, notifications_enabled): + """Test that bulk operations send notifications with multiple items.""" + if not notifications_enabled: + pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + + cursor = db_connection.cursor() + + # Clear any pending notifications + db_connection.poll() + while db_connection.notifies: + db_connection.notifies.pop(0) + + test_collection_id = "noaa-emergency-response" + + # Insert multiple items using pgstac.create_item + test_items = [f"bulk-item-{i}-{int(time.time())}" for i in range(3)] + + for item_id in test_items: + item_data = json.dumps({ + "id": item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {} + }) + + cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) + + # Wait for notifications (should get one per insert since we're doing separate statements) + timeout = 10 + start_time = time.time() + notifications_received = 0 + + while time.time() - start_time < timeout and notifications_received < len(test_items): + db_connection.poll() + while db_connection.notifies: + notify = db_connection.notifies.pop(0) + assert notify.channel == "pgstac_items_change" + + payload = json.loads(notify.payload) + assert payload["operation"] == "INSERT" + assert "items" in payload + notifications_received += len(payload["items"]) + + assert notifications_received >= len(test_items), f"Should have received notifications for all {len(test_items)} items" + + # Cleanup + for item_id in test_items: + cursor.execute("SELECT pgstac.delete_item(%s);", (item_id,)) + + cursor.close() diff --git a/CHANGELOG.md b/CHANGELOG.md index 3207028d..83eebe2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Enforcement of `CHANGELOG.md` entries for PRs and Conventional Commits for PR titles [#288](https://github.com/developmentseed/eoapi-k8s/pull/288) - Added code formatting and linting with pre-commit hooks [#283](https://github.com/developmentseed/eoapi-k8s/pull/283) - Added values.schema.json validation [#296](https://github.com/developmentseed/eoapi-k8s/pull/296) +- Adjusted Renovate Configuration to fit conventional commits [#295](https://github.com/developmentseed/eoapi-k8s/pull/295) +- Notification triggers in database [#289](https://github.com/developmentseed/eoapi-k8s/pull/289) + +### Changed + +- Excluded renovate.json from CHANGELOG.md edits [#301](https://github.com/developmentseed/eoapi-k8s/pull/301) ## [0.7.8] - 2025-09-10 diff --git a/charts/eoapi/Chart.yaml b/charts/eoapi/Chart.yaml index 4c916160..ca101486 100644 --- a/charts/eoapi/Chart.yaml +++ b/charts/eoapi/Chart.yaml @@ -53,3 +53,7 @@ dependencies: version: 5.7.4 repository: "https://devseed.com/eoapi-k8s/" condition: postgrescluster.enabled + - name: eoapi-notifier + version: 0.0.5 + repository: "oci://ghcr.io/developmentseed/charts" + condition: eoapi-notifier.enabled diff --git a/charts/eoapi/README.md b/charts/eoapi/README.md index eaf559cf..2cce4c63 100644 --- a/charts/eoapi/README.md +++ b/charts/eoapi/README.md @@ -12,6 +12,7 @@ A Helm chart for deploying Earth Observation APIs with integrated STAC, raster, - Multidimensional data support - Built-in STAC Browser interface - Flexible database configuration +- Real-time PostgreSQL notifications for STAC item changes - Unified ingress system ## TL;DR @@ -75,6 +76,7 @@ pgstacBootstrap: | `ingress.className` | Ingress controller class | `nginx` | | `browser.enabled` | Enable STAC Browser interface | `true` | | `pgstacBootstrap.enabled` | Enable database initialization | `true` | +| `notifications.sources.pgstac` | Enable PostgreSQL notification triggers for STAC item changes | `false` | Refer to the [values.schema.json](./values.schema.json) for the complete list of configurable parameters. diff --git a/charts/eoapi/initdb-data/settings/pgstac-notification-triggers.sql b/charts/eoapi/initdb-data/settings/pgstac-notification-triggers.sql new file mode 100644 index 00000000..e15dd562 --- /dev/null +++ b/charts/eoapi/initdb-data/settings/pgstac-notification-triggers.sql @@ -0,0 +1,42 @@ +-- Create the notification function +CREATE OR REPLACE FUNCTION notify_items_change_func() +RETURNS TRIGGER AS $$ +DECLARE + +BEGIN + PERFORM pg_notify('pgstac_items_change'::text, json_build_object( + 'operation', TG_OP, + 'items', jsonb_agg( + jsonb_build_object( + 'collection', data.collection, + 'id', data.id + ) + ) + )::text + ) + FROM data + ; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +-- Create triggers for INSERT operations +CREATE OR REPLACE TRIGGER notify_items_change_insert + AFTER INSERT ON pgstac.items + REFERENCING NEW TABLE AS data + FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func() +; + +-- Create triggers for UPDATE operations +CREATE OR REPLACE TRIGGER notify_items_change_update + AFTER UPDATE ON pgstac.items + REFERENCING NEW TABLE AS data + FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func() +; + +-- Create triggers for DELETE operations +CREATE OR REPLACE TRIGGER notify_items_change_delete + AFTER DELETE ON pgstac.items + REFERENCING OLD TABLE AS data + FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func() +; diff --git a/charts/eoapi/samples/cloudevents-sink.yaml b/charts/eoapi/samples/cloudevents-sink.yaml new file mode 100644 index 00000000..ec9acbe6 --- /dev/null +++ b/charts/eoapi/samples/cloudevents-sink.yaml @@ -0,0 +1,27 @@ +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: eoapi-cloudevents-sink + namespace: eoapi +spec: + template: + metadata: + annotations: + autoscaling.knative.dev/minScale: "1" + autoscaling.knative.dev/maxScale: "1" + spec: + containers: + - name: cloudevents-sink + image: gcr.io/knative-samples/helloworld-go + ports: + - containerPort: 8080 + env: + - name: TARGET + value: "CloudEvents Sink" + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 200m + memory: 256Mi diff --git a/charts/eoapi/templates/pgstacbootstrap/configmap.yaml b/charts/eoapi/templates/pgstacbootstrap/configmap.yaml index 752e7fc2..e13774dc 100644 --- a/charts/eoapi/templates/pgstacbootstrap/configmap.yaml +++ b/charts/eoapi/templates/pgstacbootstrap/configmap.yaml @@ -14,6 +14,9 @@ metadata: data: pgstac-settings.sql: | {{ $.Files.Get "initdb-data/settings/pgstac-settings.sql" | nindent 4 }} + {{- if (index .Values "eoapi-notifier").enabled }} + {{ $.Files.Get "initdb-data/settings/pgstac-notification-triggers.sql" | nindent 4 }} + {{- end }} --- {{- if .Values.pgstacBootstrap.settings.loadSamples }} apiVersion: v1 diff --git a/charts/eoapi/templates/services/_common.tpl b/charts/eoapi/templates/services/_common.tpl index 36c19cd6..430a87c5 100644 --- a/charts/eoapi/templates/services/_common.tpl +++ b/charts/eoapi/templates/services/_common.tpl @@ -34,7 +34,7 @@ Helper function for common init containers to wait for pgstac jobs {{- if .Values.pgstacBootstrap.enabled }} initContainers: - name: wait-for-pgstac-jobs - image: bitnami/kubectl:latest + image: alpine/k8s:1.28.0 env: {{- include "eoapi.commonEnvVars" (dict "service" "init" "root" .) | nindent 2 }} resources: diff --git a/charts/eoapi/test-k3s-unittest-values.yaml b/charts/eoapi/test-k3s-unittest-values.yaml index f52ac9e6..33a4ec56 100644 --- a/charts/eoapi/test-k3s-unittest-values.yaml +++ b/charts/eoapi/test-k3s-unittest-values.yaml @@ -48,3 +48,41 @@ vector: envVars: # needs to on so we can call /refresh for integration tests TIPG_DEBUG: "True" + +###################### +# NOTIFICATIONS +###################### +eoapi-notifier: + enabled: true + config: + logLevel: DEBUG + sources: + - type: pgstac + config: + channel: pgstac_items_change + connection: + existingSecret: + name: "eoapi-test-pguser-eoapi" + keys: + username: "user" + password: "password" + host: "host" + port: "port" + database: "dbname" + outputs: + - type: cloudevents + config: + source: /eoapi/pgstac + event_type: org.eoapi.stac.item + destination: + ref: + apiVersion: serving.knative.dev/v1 + kind: Service + name: eoapi-cloudevents-sink + resources: + requests: + cpu: "50m" + memory: "64Mi" + limits: + cpu: "200m" + memory: "128Mi" diff --git a/charts/eoapi/tests/pgstac_notification_tests.yaml b/charts/eoapi/tests/pgstac_notification_tests.yaml new file mode 100644 index 00000000..c10a8e39 --- /dev/null +++ b/charts/eoapi/tests/pgstac_notification_tests.yaml @@ -0,0 +1,150 @@ +suite: pgstac notification triggers tests +templates: + - templates/pgstacbootstrap/configmap.yaml +tests: + - it: "pgstac settings should include notification function when enabled" + set: + pgstacBootstrap.enabled: true + eoapi-notifier.enabled: true + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - equal: + path: metadata.name + value: RELEASE-NAME-pgstac-settings-config + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: CREATE OR REPLACE FUNCTION notify_items_change_func\(\) + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: RETURNS TRIGGER AS \$\$ + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: PERFORM pg_notify\('pgstac_items_change'::text + + - it: "pgstac settings should include INSERT trigger when enabled" + set: + pgstacBootstrap.enabled: true + eoapi-notifier.enabled: true + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: CREATE OR REPLACE TRIGGER notify_items_change_insert + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: AFTER INSERT ON pgstac\.items + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: REFERENCING NEW TABLE AS data + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: FOR EACH STATEMENT EXECUTE FUNCTION notify_items_change_func\(\) + + - it: "pgstac settings should include UPDATE trigger when enabled" + set: + pgstacBootstrap.enabled: true + eoapi-notifier.enabled: true + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: CREATE OR REPLACE TRIGGER notify_items_change_update + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: AFTER UPDATE ON pgstac\.items + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: REFERENCING NEW TABLE AS data + + - it: "pgstac settings should include DELETE trigger when enabled" + set: + pgstacBootstrap.enabled: true + eoapi-notifier.enabled: true + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: CREATE OR REPLACE TRIGGER notify_items_change_delete + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: AFTER DELETE ON pgstac\.items + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: REFERENCING OLD TABLE AS data + + - it: "notification function should include operation and items in payload when enabled" + set: + pgstacBootstrap.enabled: true + eoapi-notifier.enabled: true + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: json_build_object\( + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: "'operation', TG_OP" + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: "'items', jsonb_agg\\(" + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: "'collection', data\\.collection" + - matchRegex: + path: data["pgstac-settings.sql"] + pattern: "'id', data\\.id" + + - it: "notification triggers should not be included by default (disabled by default)" + set: + pgstacBootstrap.enabled: true + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - equal: + path: metadata.name + value: RELEASE-NAME-pgstac-settings-config + - notMatchRegex: + path: data["pgstac-settings.sql"] + pattern: CREATE OR REPLACE FUNCTION notify_items_change_func\(\) + + - it: "notification triggers should not be included when disabled" + set: + pgstacBootstrap.enabled: true + eoapi-notifier.enabled: false + documentIndex: 0 + asserts: + - isKind: + of: ConfigMap + - equal: + path: metadata.name + value: RELEASE-NAME-pgstac-settings-config + - notMatchRegex: + path: data["pgstac-settings.sql"] + pattern: CREATE OR REPLACE FUNCTION notify_items_change_func\(\) + - notMatchRegex: + path: data["pgstac-settings.sql"] + pattern: notify_items_change_insert + - notMatchRegex: + path: data["pgstac-settings.sql"] + pattern: notify_items_change_update + - notMatchRegex: + path: data["pgstac-settings.sql"] + pattern: notify_items_change_delete + + - it: "pgstac settings configmap should not be created when pgstacBootstrap is disabled" + set: + pgstacBootstrap.enabled: false + asserts: + - hasDocuments: + count: 1 diff --git a/charts/eoapi/values.schema.json b/charts/eoapi/values.schema.json index 469b62ca..096f3893 100644 --- a/charts/eoapi/values.schema.json +++ b/charts/eoapi/values.schema.json @@ -342,6 +342,71 @@ "description": "Enable documentation server" } } + }, + "eoapi-notifier": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "default": false, + "description": "Enable eoAPI notifier for database notifications to CloudEvents" + }, + "sources": { + "type": "object", + "properties": { + "pgstac": { + "type": "boolean", + "default": false, + "description": "Enable PostgreSQL notification triggers for STAC item changes" + } + } + }, + "config": { + "type": "object", + "properties": { + "logLevel": { + "type": "string", + "enum": ["DEBUG", "INFO", "WARNING", "ERROR"], + "default": "INFO", + "description": "Log level for eoapi-notifier" + }, + "sources": { + "type": "array", + "description": "Notification source configurations" + }, + "outputs": { + "type": "array", + "description": "Notification output configurations" + } + } + }, + "secrets": { + "type": "object", + "properties": { + "postgresql": { + "type": "object", + "properties": { + "create": { + "type": "boolean", + "description": "Create PostgreSQL secret" + } + } + } + } + }, + "env": { + "type": "object", + "description": "Environment variables for eoapi-notifier" + }, + "envFrom": { + "type": "array", + "description": "Environment variables from references" + }, + "resources": { + "type": "object", + "description": "Resource requirements for eoapi-notifier" + } + } } }, "definitions": { diff --git a/charts/eoapi/values.yaml b/charts/eoapi/values.yaml index a5e529a6..fa5ba013 100644 --- a/charts/eoapi/values.yaml +++ b/charts/eoapi/values.yaml @@ -488,3 +488,30 @@ docServer: # helm upgrade --set previousVersion=$PREVIOUS_VERSION # or in the CI/CD pipeline previousVersion: "" + +###################### +# NOTIFICATIONS +###################### +eoapi-notifier: + enabled: false + config: + logLevel: INFO + sources: + - type: postgres + config: + channel: pgstac_items_change + outputs: + - type: cloudevents + config: + source: /eoapi/pgstac + event_type: org.eoapi.stac.item + destination: + ref: + apiVersion: serving.knative.dev/v1 + kind: Service + name: eoapi-cloudevents-sink + secrets: + postgresql: + create: false + env: + POSTGRES_CHANNEL: "pgstac_items_change" diff --git a/scripts/deploy.sh b/scripts/deploy.sh index d8a7bc7f..0b356109 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -147,6 +147,7 @@ deploy_eoapi() { fi # Set git SHA if available + GITHUB_SHA=${GITHUB_SHA:-} if [ -n "$GITHUB_SHA" ]; then HELM_CMD="$HELM_CMD --set gitSha=$GITHUB_SHA" elif [ -n "$(git rev-parse HEAD 2>/dev/null)" ]; then diff --git a/scripts/test.sh b/scripts/test.sh index 70d72c79..9fea0df8 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -1,4 +1,5 @@ #!/bin/bash +# shellcheck source=lib/common.sh # eoAPI Test Suite # Combined Helm and Integration Testing Script @@ -535,6 +536,8 @@ setup_test_environment() { log_info " Vector: $VECTOR_ENDPOINT" } + + # Run integration tests run_integration_tests() { log_info "=== Running Integration Tests ===" @@ -603,6 +606,128 @@ run_integration_tests() { log_info "Raster tests passed" fi + # Notification system tests + log_info "=== Running notification system tests ===" + + # Deploy CloudEvents sink for notification tests + if kubectl apply -f "$SCRIPT_DIR/../charts/eoapi/samples/cloudevents-sink.yaml" >/dev/null 2>&1; then + log_debug "CloudEvents sink deployed for notification tests" + # Wait for the service to be ready + kubectl wait --for=condition=Ready ksvc/eoapi-cloudevents-sink -n "$NAMESPACE" --timeout=60s >/dev/null 2>&1 || true + else + log_debug "CloudEvents sink already exists or failed to deploy" + fi + + # Get database credentials for end-to-end tests + local db_name db_user db_password port_forward_pid + if db_name=$(kubectl get secret -n "$NAMESPACE" "${RELEASE_NAME}-pguser-eoapi" -o jsonpath='{.data.dbname}' 2>/dev/null | base64 -d 2>/dev/null) && \ + db_user=$(kubectl get secret -n "$NAMESPACE" "${RELEASE_NAME}-pguser-eoapi" -o jsonpath='{.data.user}' 2>/dev/null | base64 -d 2>/dev/null) && \ + db_password=$(kubectl get secret -n "$NAMESPACE" "${RELEASE_NAME}-pguser-eoapi" -o jsonpath='{.data.password}' 2>/dev/null | base64 -d 2>/dev/null); then + + log_debug "Setting up database connection for end-to-end notification tests..." + kubectl port-forward -n "$NAMESPACE" "svc/${RELEASE_NAME}-pgbouncer" 5433:5432 >/dev/null 2>&1 & + port_forward_pid=$! + sleep 3 + + # Run tests with database connection + local notification_test_env + notification_test_env=$(cat << EOF +PGHOST=localhost +PGPORT=5433 +PGDATABASE=$db_name +PGUSER=$db_user +PGPASSWORD=$db_password +NAMESPACE=$NAMESPACE +RELEASE_NAME=$RELEASE_NAME +EOF + ) + + if env "$notification_test_env" $python_cmd -m pytest "$test_dir/test_notifications.py" -v; then + log_info "Notification system tests passed" + else + log_warn "Notification system tests failed" + failed_tests+=("notifications") + + # Show eoapi-notifier logs on failure + log_info "=== eoapi-notifier service logs ===" + kubectl logs -l app.kubernetes.io/name=eoapi-notifier -n "$NAMESPACE" --tail=50 2>/dev/null || \ + log_warn "Could not get eoapi-notifier service logs" + + # Show CloudEvents sink logs on failure + log_info "=== CloudEvents sink logs ===" + kubectl logs -l serving.knative.dev/service -n "$NAMESPACE" --tail=50 2>/dev/null || \ + log_warn "Could not get Knative CloudEvents sink logs" + fi + + # Clean up port forwarding + if [ -n "$port_forward_pid" ]; then + kill "$port_forward_pid" 2>/dev/null || true + wait "$port_forward_pid" 2>/dev/null || true + fi + else + log_warn "Could not retrieve database credentials, running basic notification tests only" + if ! $python_cmd -m pytest "$test_dir/test_notifications.py" -v -k "not end_to_end"; then + log_warn "Basic notification system tests failed" + failed_tests+=("notifications") + else + log_info "Basic notification system tests passed" + fi + fi + + # PgSTAC notification tests + log_info "=== Running PgSTAC notification tests ===" + + # Get database credentials from secret + local db_name db_user db_password + if db_name=$(kubectl get secret -n "$NAMESPACE" "${RELEASE_NAME}-pguser-eoapi" -o jsonpath='{.data.dbname}' 2>/dev/null | base64 -d 2>/dev/null) && \ + db_user=$(kubectl get secret -n "$NAMESPACE" "${RELEASE_NAME}-pguser-eoapi" -o jsonpath='{.data.user}' 2>/dev/null | base64 -d 2>/dev/null) && \ + db_password=$(kubectl get secret -n "$NAMESPACE" "${RELEASE_NAME}-pguser-eoapi" -o jsonpath='{.data.password}' 2>/dev/null | base64 -d 2>/dev/null); then + + log_debug "Database credentials retrieved for pgstac notifications test" + + # Set up port forwarding to database + log_debug "Setting up port forwarding to database..." + kubectl port-forward -n "$NAMESPACE" "svc/${RELEASE_NAME}-pgbouncer" 5433:5432 >/dev/null 2>&1 & + local port_forward_pid=$! + + # Give port forwarding time to establish + sleep 3 + + # Run the test with proper environment variables + export PGHOST=localhost + export PGPORT=5433 + export PGDATABASE=$db_name + export PGUSER=$db_user + export PGPASSWORD=$db_password + + if $python_cmd -m pytest "$test_dir/test_pgstac_notifications.py" -v; then + log_info "PgSTAC notification tests passed" + else + log_warn "PgSTAC notification tests failed" + failed_tests+=("pgstac-notifications") + fi + + # Also run end-to-end notification test with same DB connection + log_info "Running end-to-end notification flow test..." + if NAMESPACE="$NAMESPACE" RELEASE_NAME="$RELEASE_NAME" $python_cmd -m pytest "$test_dir/test_notifications.py::test_end_to_end_notification_flow" -v; then + log_info "End-to-end notification test passed" + else + log_warn "End-to-end notification test failed" + failed_tests+=("e2e-notifications") + fi + + # Clean up port forwarding + if [ -n "$port_forward_pid" ]; then + kill "$port_forward_pid" 2>/dev/null || true + wait "$port_forward_pid" 2>/dev/null || true + fi + + else + log_warn "Could not retrieve database credentials for PgSTAC notification tests" + failed_tests+=("pgstac-notifications") + fi + + # Report results if [ ${#failed_tests[@]} -eq 0 ]; then log_info "✅ All integration tests completed successfully!"