Skip to content

Commit ba4afd5

Browse files
committed
Added eoapi-notifier middle-ware.
1 parent 45a5e83 commit ba4afd5

File tree

12 files changed

+501
-22
lines changed

12 files changed

+501
-22
lines changed

.github/workflows/helm-tests.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ jobs:
112112
echo "===== Related Kubernetes Events ====="
113113
kubectl get events | grep -E "pgstac|initdb" || echo "No relevant events found"
114114
115+
# Check notification system status
116+
echo "===== Notification System Status ====="
117+
kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment found"
118+
kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink found"
119+
115120
exit 1
116121
117122
- name: Run integration tests
@@ -130,6 +135,19 @@ jobs:
130135
kubectl get services -o wide
131136
kubectl get ingress
132137
138+
# Check notification system final status
139+
echo "=== Notification System Final Status ==="
140+
kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment"
141+
kubectl get pods -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier pods"
142+
kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink"
143+
kubectl get pods -l serving.knative.dev/service -o wide || echo "No Knative CloudEvents sink pods"
144+
145+
# Show notification logs if they exist
146+
echo "=== eoapi-notifier Logs ==="
147+
kubectl logs -l app.kubernetes.io/name=eoapi-notifier --tail=20 || echo "No eoapi-notifier logs"
148+
echo "=== Knative CloudEvents Sink Logs ==="
149+
kubectl logs -l serving.knative.dev/service --tail=20 || echo "No Knative CloudEvents sink logs"
150+
133151
134152
- name: Cleanup
135153
if: always()
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
"""Test notification system deployment and functionality."""
2+
import json
3+
import os
4+
import psycopg2
5+
import psycopg2.extensions
6+
import requests
7+
import subprocess
8+
import time
9+
import pytest
10+
from datetime import datetime
11+
12+
13+
def test_eoapi_notifier_deployment():
14+
"""Test that eoapi-notifier deployment is running."""
15+
# Check if eoapi-notifier deployment exists and is ready
16+
result = subprocess.run([
17+
'kubectl', 'get', 'deployment',
18+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
19+
'-n', 'eoapi',
20+
'--no-headers', '-o', 'custom-columns=READY:.status.readyReplicas'
21+
], capture_output=True, text=True)
22+
23+
if result.returncode != 0:
24+
pytest.skip("eoapi-notifier deployment not found - notifications not enabled")
25+
26+
ready_replicas = result.stdout.strip()
27+
assert ready_replicas == "1", f"Expected 1 ready replica, got {ready_replicas}"
28+
29+
30+
def test_cloudevents_sink_exists():
31+
"""Test that Knative CloudEvents sink service exists and is accessible."""
32+
# Check if Knative service exists
33+
result = subprocess.run([
34+
'kubectl', 'get', 'ksvc',
35+
'-l', 'app.kubernetes.io/component=cloudevents-sink',
36+
'--no-headers'
37+
], capture_output=True, text=True)
38+
39+
if result.returncode != 0 or not result.stdout.strip():
40+
pytest.skip("Knative CloudEvents sink not found - notifications not configured")
41+
42+
assert "cloudevents-sink" in result.stdout, "Knative CloudEvents sink should exist"
43+
44+
45+
def test_notification_configuration():
46+
"""Test that eoapi-notifier is configured correctly."""
47+
# Get the configmap for eoapi-notifier
48+
result = subprocess.run([
49+
'kubectl', 'get', 'configmap',
50+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
51+
'-o', r'jsonpath={.items[0].data.config\.yaml}'
52+
], capture_output=True, text=True)
53+
54+
if result.returncode != 0:
55+
pytest.skip("eoapi-notifier configmap not found")
56+
57+
config_yaml = result.stdout.strip()
58+
assert "postgres" in config_yaml, "Should have postgres source configured"
59+
assert "cloudevents" in config_yaml, "Should have cloudevents output configured"
60+
assert "pgstac_items_change" in config_yaml, "Should listen to pgstac_items_change channel"
61+
62+
63+
def test_cloudevents_sink_logs_show_startup():
64+
"""Test that Knative CloudEvents sink started successfully."""
65+
# Get Knative CloudEvents sink pod logs
66+
result = subprocess.run([
67+
'kubectl', 'logs',
68+
'-l', 'serving.knative.dev/service',
69+
'-n', 'eoapi',
70+
'--tail=20'
71+
], capture_output=True, text=True)
72+
73+
if result.returncode != 0:
74+
pytest.skip("Cannot get Knative CloudEvents sink logs")
75+
76+
logs = result.stdout
77+
assert "listening on port" in logs, "Knative CloudEvents sink should have started successfully"
78+
79+
80+
def test_eoapi_notifier_logs_show_connection():
81+
"""Test that eoapi-notifier connects to database successfully."""
82+
# Give some time for the notifier to start
83+
time.sleep(5)
84+
85+
# Get eoapi-notifier pod logs
86+
result = subprocess.run([
87+
'kubectl', 'logs',
88+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
89+
'--tail=50'
90+
], capture_output=True, text=True)
91+
92+
if result.returncode != 0:
93+
pytest.skip("Cannot get eoapi-notifier logs")
94+
95+
logs = result.stdout
96+
# Should not have connection errors
97+
assert "Connection refused" not in logs, "Should not have connection errors"
98+
assert "Authentication failed" not in logs, "Should not have auth errors"
99+
100+
101+
def test_database_notification_triggers_exist():
102+
"""Test that pgstac notification triggers are installed."""
103+
# Port forward to database if not already done
104+
import psycopg2
105+
import os
106+
107+
try:
108+
# Try to connect using environment variables set by the test runner
109+
conn = psycopg2.connect(
110+
host=os.getenv('PGHOST', 'localhost'),
111+
port=int(os.getenv('PGPORT', '5432')),
112+
database=os.getenv('PGDATABASE', 'eoapi'),
113+
user=os.getenv('PGUSER', 'eoapi'),
114+
password=os.getenv('PGPASSWORD', '')
115+
)
116+
117+
with conn.cursor() as cur:
118+
# Check if the notification function exists
119+
cur.execute("""
120+
SELECT EXISTS(
121+
SELECT 1 FROM pg_proc p
122+
JOIN pg_namespace n ON p.pronamespace = n.oid
123+
WHERE n.nspname = 'public'
124+
AND p.proname = 'notify_items_change_func'
125+
);
126+
""")
127+
function_exists = cur.fetchone()[0]
128+
assert function_exists, "notify_items_change_func should exist"
129+
130+
# Check if triggers exist
131+
cur.execute("""
132+
SELECT COUNT(*) FROM information_schema.triggers
133+
WHERE trigger_name LIKE 'notify_items_change_%'
134+
AND event_object_table = 'items'
135+
AND event_object_schema = 'pgstac';
136+
""")
137+
trigger_count = cur.fetchone()[0]
138+
assert trigger_count >= 3, f"Should have at least 3 triggers (INSERT, UPDATE, DELETE), found {trigger_count}"
139+
140+
conn.close()
141+
142+
except (psycopg2.Error, ConnectionError, OSError):
143+
pytest.skip("Cannot connect to database for trigger verification")
144+
145+
146+
def test_end_to_end_notification_flow():
147+
"""Test complete flow: database → eoapi-notifier → Knative CloudEvents sink."""
148+
149+
# Skip if notifications not enabled
150+
if not subprocess.run(['kubectl', 'get', 'deployment', '-l', 'app.kubernetes.io/name=eoapi-notifier', '--no-headers'], capture_output=True).stdout.strip():
151+
pytest.skip("eoapi-notifier not deployed")
152+
153+
# Find Knative CloudEvents sink pod
154+
result = subprocess.run(['kubectl', 'get', 'pods', '-l', 'serving.knative.dev/service', '-o', 'jsonpath={.items[0].metadata.name}'], capture_output=True, text=True)
155+
156+
if result.returncode != 0 or not result.stdout.strip():
157+
pytest.skip("Knative CloudEvents sink pod not found")
158+
159+
sink_pod = result.stdout.strip()
160+
161+
# Connect to database using test environment
162+
try:
163+
conn = psycopg2.connect(
164+
host=os.getenv('PGHOST', 'localhost'),
165+
port=int(os.getenv('PGPORT', '5432')),
166+
database=os.getenv('PGDATABASE', 'eoapi'),
167+
user=os.getenv('PGUSER', 'eoapi'),
168+
password=os.getenv('PGPASSWORD', '')
169+
)
170+
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
171+
except psycopg2.Error as e:
172+
pytest.skip(f"Cannot connect to database: {e}")
173+
174+
# Insert test item and check for CloudEvent
175+
test_item_id = f"e2e-test-{int(time.time())}"
176+
try:
177+
with conn.cursor() as cursor:
178+
cursor.execute("SELECT pgstac.create_item(%s);", (json.dumps({
179+
"id": test_item_id,
180+
"type": "Feature",
181+
"stac_version": "1.0.0",
182+
"collection": "noaa-emergency-response",
183+
"geometry": {"type": "Point", "coordinates": [0, 0]},
184+
"bbox": [0, 0, 0, 0],
185+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
186+
"assets": {}
187+
}),))
188+
189+
# Check CloudEvents sink logs for CloudEvent
190+
found_event = False
191+
for _ in range(20): # 20 second timeout
192+
time.sleep(1)
193+
result = subprocess.run(['kubectl', 'logs', sink_pod, '--since=30s'], capture_output=True, text=True)
194+
if result.returncode == 0 and "CloudEvent received" in result.stdout and test_item_id in result.stdout:
195+
found_event = True
196+
break
197+
198+
assert found_event, f"CloudEvent for {test_item_id} not received by CloudEvents sink"
199+
200+
finally:
201+
# Cleanup
202+
with conn.cursor() as cursor:
203+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
204+
conn.close()
205+
206+
207+
def test_k_sink_injection():
208+
"""Test that SinkBinding injects K_SINK into eoapi-notifier deployment."""
209+
# Check if eoapi-notifier deployment exists
210+
result = subprocess.run([
211+
'kubectl', 'get', 'deployment',
212+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
213+
'-o', 'jsonpath={.items[0].spec.template.spec.containers[0].env[?(@.name=="K_SINK")].value}'
214+
], capture_output=True, text=True)
215+
216+
if result.returncode != 0:
217+
pytest.skip("eoapi-notifier deployment not found")
218+
219+
k_sink_value = result.stdout.strip()
220+
if k_sink_value:
221+
assert "cloudevents-sink" in k_sink_value, f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}"
222+
print(f"✅ K_SINK properly injected: {k_sink_value}")
223+
else:
224+
# Check if SinkBinding exists - it may take time to inject
225+
sinkbinding_result = subprocess.run([
226+
'kubectl', 'get', 'sinkbinding',
227+
'-l', 'app.kubernetes.io/component=sink-binding',
228+
'--no-headers'
229+
], capture_output=True, text=True)
230+
231+
if sinkbinding_result.returncode == 0 and sinkbinding_result.stdout.strip():
232+
pytest.skip("SinkBinding exists but K_SINK not yet injected - may need more time")
233+
else:
234+
pytest.fail("No K_SINK found and no SinkBinding exists")
235+
236+
237+
if __name__ == "__main__":
238+
pytest.main([__file__, "-v"])

charts/eoapi/Chart.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,7 @@ dependencies:
5353
version: 5.7.4
5454
repository: "https://devseed.com/eoapi-k8s/"
5555
condition: postgrescluster.enabled
56+
- name: eoapi-notifier
57+
version: 0.0.4
58+
repository: "oci://ghcr.io/developmentseed/charts"
59+
condition: eoapi-notifier.enabled
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
apiVersion: serving.knative.dev/v1
2+
kind: Service
3+
metadata:
4+
name: eoapi-cloudevents-sink
5+
namespace: eoapi
6+
spec:
7+
template:
8+
metadata:
9+
annotations:
10+
autoscaling.knative.dev/minScale: "1"
11+
autoscaling.knative.dev/maxScale: "1"
12+
spec:
13+
containers:
14+
- name: cloudevents-sink
15+
image: gcr.io/knative-samples/helloworld-go
16+
ports:
17+
- containerPort: 8080
18+
env:
19+
- name: TARGET
20+
value: "CloudEvents Sink"
21+
resources:
22+
requests:
23+
cpu: 100m
24+
memory: 128Mi
25+
limits:
26+
cpu: 200m
27+
memory: 256Mi

charts/eoapi/templates/pgstacbootstrap/configmap.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ metadata:
1414
data:
1515
pgstac-settings.sql: |
1616
{{ $.Files.Get "initdb-data/settings/pgstac-settings.sql" | nindent 4 }}
17-
{{- if .Values.notifications.sources.pgstac }}
17+
{{- if (index .Values "eoapi-notifier").enabled }}
1818
{{ $.Files.Get "initdb-data/settings/pgstac-notification-triggers.sql" | nindent 4 }}
1919
{{- end }}
2020
---

charts/eoapi/templates/services/_common.tpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Helper function for common init containers to wait for pgstac jobs
3434
{{- if .Values.pgstacBootstrap.enabled }}
3535
initContainers:
3636
- name: wait-for-pgstac-jobs
37-
image: bitnami/kubectl:latest
37+
image: alpine/k8s:1.28.0
3838
env:
3939
{{- include "eoapi.commonEnvVars" (dict "service" "init" "root" .) | nindent 2 }}
4040
resources:

charts/eoapi/test-k3s-unittest-values.yaml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,48 @@ vector:
4848
envVars:
4949
# needs to on so we can call /refresh for integration tests
5050
TIPG_DEBUG: "True"
51+
52+
######################
53+
# NOTIFICATIONS
54+
######################
55+
eoapi-notifier:
56+
enabled: true
57+
config:
58+
logLevel: DEBUG
59+
sources:
60+
- type: pgstac
61+
config:
62+
channel: pgstac_items_change
63+
outputs:
64+
- type: cloudevents
65+
config:
66+
source: /eoapi/pgstac
67+
event_type: org.eoapi.stac.item
68+
destination:
69+
ref:
70+
apiVersion: serving.knative.dev/v1
71+
kind: Service
72+
name: eoapi-cloudevents-sink
73+
resources:
74+
requests:
75+
cpu: "50m"
76+
memory: "64Mi"
77+
limits:
78+
cpu: "200m"
79+
memory: "128Mi"
80+
secrets:
81+
postgresql:
82+
create: false
83+
existingSecret:
84+
name: "eoapi-test-pguser-eoapi"
85+
keys:
86+
username: "user"
87+
password: "password"
88+
host: "host"
89+
port: "port"
90+
database: "dbname"
91+
env:
92+
POSTGRES_CHANNEL: "pgstac_items_change"
93+
CLOUDEVENTS_ENDPOINT: "https://webhook.site/test-endpoint"
94+
CLOUDEVENTS_SOURCE: "/eoapi/pgstac"
95+
CLOUDEVENTS_EVENT_TYPE: "org.eoapi.stac.item"

0 commit comments

Comments
 (0)