Skip to content

Commit f32f782

Browse files
committed
Added eoapi-notifier middle-ware.
1 parent 0c73fea commit f32f782

File tree

10 files changed

+536
-9
lines changed

10 files changed

+536
-9
lines changed

.github/workflows/helm-tests.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ jobs:
112112
echo "===== Related Kubernetes Events ====="
113113
kubectl get events | grep -E "pgstac|initdb" || echo "No relevant events found"
114114
115+
# Check notification system status
116+
echo "===== Notification System Status ====="
117+
kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment found"
118+
kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink found"
119+
115120
exit 1
116121
117122
- name: Run integration tests
@@ -130,6 +135,19 @@ jobs:
130135
kubectl get services -o wide
131136
kubectl get ingress
132137
138+
# Check notification system final status
139+
echo "=== Notification System Final Status ==="
140+
kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment"
141+
kubectl get pods -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier pods"
142+
kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink"
143+
kubectl get pods -l serving.knative.dev/service -o wide || echo "No Knative CloudEvents sink pods"
144+
145+
# Show notification logs if they exist
146+
echo "=== eoapi-notifier Logs ==="
147+
kubectl logs -l app.kubernetes.io/name=eoapi-notifier --tail=20 || echo "No eoapi-notifier logs"
148+
echo "=== Knative CloudEvents Sink Logs ==="
149+
kubectl logs -l serving.knative.dev/service --tail=20 || echo "No Knative CloudEvents sink logs"
150+
133151
134152
- name: Cleanup
135153
if: always()
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""Test notification system deployment and functionality."""
2+
import json
3+
import os
4+
import psycopg2
5+
import psycopg2.extensions
6+
import requests
7+
import subprocess
8+
import time
9+
import pytest
10+
from datetime import datetime
11+
12+
13+
def test_eoapi_notifier_deployment():
14+
"""Test that eoapi-notifier deployment is running."""
15+
# Check if eoapi-notifier deployment exists and is ready
16+
result = subprocess.run([
17+
'kubectl', 'get', 'deployment',
18+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
19+
'--no-headers', '-o', 'custom-columns=READY:.status.readyReplicas'
20+
], capture_output=True, text=True)
21+
22+
if result.returncode != 0:
23+
pytest.skip("eoapi-notifier deployment not found - notifications not enabled")
24+
25+
ready_replicas = result.stdout.strip()
26+
assert ready_replicas == "1", f"Expected 1 ready replica, got {ready_replicas}"
27+
28+
29+
def test_cloudevents_sink_exists():
30+
"""Test that Knative CloudEvents sink service exists and is accessible."""
31+
# Check if Knative service exists
32+
result = subprocess.run([
33+
'kubectl', 'get', 'ksvc',
34+
'-l', 'app.kubernetes.io/component=cloudevents-sink',
35+
'--no-headers'
36+
], capture_output=True, text=True)
37+
38+
if result.returncode != 0 or not result.stdout.strip():
39+
pytest.skip("Knative CloudEvents sink not found - notifications not configured")
40+
41+
assert "cloudevents-sink" in result.stdout, "Knative CloudEvents sink should exist"
42+
43+
44+
def test_notification_configuration():
45+
"""Test that eoapi-notifier is configured correctly."""
46+
# Get the configmap for eoapi-notifier
47+
result = subprocess.run([
48+
'kubectl', 'get', 'configmap',
49+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
50+
'-o', r'jsonpath={.items[0].data.config\.yaml}'
51+
], capture_output=True, text=True)
52+
53+
if result.returncode != 0:
54+
pytest.skip("eoapi-notifier configmap not found")
55+
56+
config_yaml = result.stdout.strip()
57+
assert "postgres" in config_yaml, "Should have postgres source configured"
58+
assert "cloudevents" in config_yaml, "Should have cloudevents output configured"
59+
assert "pgstac_items_change" in config_yaml, "Should listen to pgstac_items_change channel"
60+
61+
62+
def test_cloudevents_sink_logs_show_startup():
63+
"""Test that Knative CloudEvents sink started successfully."""
64+
# Get Knative CloudEvents sink pod logs
65+
result = subprocess.run([
66+
'kubectl', 'logs',
67+
'-l', 'serving.knative.dev/service',
68+
'--tail=20'
69+
], capture_output=True, text=True)
70+
71+
if result.returncode != 0:
72+
pytest.skip("Cannot get Knative CloudEvents sink logs")
73+
74+
logs = result.stdout
75+
assert "CloudEvents sink started on port" in logs, "Knative CloudEvents sink should have started successfully"
76+
77+
78+
def test_eoapi_notifier_logs_show_connection():
79+
"""Test that eoapi-notifier connects to database successfully."""
80+
# Give some time for the notifier to start
81+
time.sleep(5)
82+
83+
# Get eoapi-notifier pod logs
84+
result = subprocess.run([
85+
'kubectl', 'logs',
86+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
87+
'--tail=50'
88+
], capture_output=True, text=True)
89+
90+
if result.returncode != 0:
91+
pytest.skip("Cannot get eoapi-notifier logs")
92+
93+
logs = result.stdout
94+
# Should not have connection errors
95+
assert "Connection refused" not in logs, "Should not have connection errors"
96+
assert "Authentication failed" not in logs, "Should not have auth errors"
97+
98+
99+
def test_database_notification_triggers_exist():
100+
"""Test that pgstac notification triggers are installed."""
101+
# Port forward to database if not already done
102+
import psycopg2
103+
import os
104+
105+
try:
106+
# Try to connect using environment variables set by the test runner
107+
conn = psycopg2.connect(
108+
host=os.getenv('PGHOST', 'localhost'),
109+
port=int(os.getenv('PGPORT', '5432')),
110+
database=os.getenv('PGDATABASE', 'eoapi'),
111+
user=os.getenv('PGUSER', 'eoapi'),
112+
password=os.getenv('PGPASSWORD', '')
113+
)
114+
115+
with conn.cursor() as cur:
116+
# Check if the notification function exists
117+
cur.execute("""
118+
SELECT EXISTS(
119+
SELECT 1 FROM pg_proc p
120+
JOIN pg_namespace n ON p.pronamespace = n.oid
121+
WHERE n.nspname = 'public'
122+
AND p.proname = 'notify_items_change_func'
123+
);
124+
""")
125+
function_exists = cur.fetchone()[0]
126+
assert function_exists, "notify_items_change_func should exist"
127+
128+
# Check if triggers exist
129+
cur.execute("""
130+
SELECT COUNT(*) FROM information_schema.triggers
131+
WHERE trigger_name LIKE 'notify_items_change_%'
132+
AND event_object_table = 'items'
133+
AND event_object_schema = 'pgstac';
134+
""")
135+
trigger_count = cur.fetchone()[0]
136+
assert trigger_count >= 3, f"Should have at least 3 triggers (INSERT, UPDATE, DELETE), found {trigger_count}"
137+
138+
conn.close()
139+
140+
except (psycopg2.Error, ConnectionError, OSError):
141+
pytest.skip("Cannot connect to database for trigger verification")
142+
143+
144+
def test_end_to_end_notification_flow():
145+
"""Test complete flow: database → eoapi-notifier → Knative CloudEvents sink."""
146+
147+
# Skip if notifications not enabled
148+
if not subprocess.run(['kubectl', 'get', 'deployment', '-l', 'app.kubernetes.io/name=eoapi-notifier', '--no-headers'], capture_output=True).stdout.strip():
149+
pytest.skip("eoapi-notifier not deployed")
150+
151+
# Find Knative CloudEvents sink pod
152+
result = subprocess.run(['kubectl', 'get', 'pods', '-l', 'serving.knative.dev/service', '-o', 'jsonpath={.items[0].metadata.name}'], capture_output=True, text=True)
153+
154+
if result.returncode != 0 or not result.stdout.strip():
155+
pytest.skip("Knative CloudEvents sink pod not found")
156+
157+
sink_pod = result.stdout.strip()
158+
159+
# Connect to database using test environment
160+
try:
161+
conn = psycopg2.connect(
162+
host=os.getenv('PGHOST', 'localhost'),
163+
port=int(os.getenv('PGPORT', '5432')),
164+
database=os.getenv('PGDATABASE', 'eoapi'),
165+
user=os.getenv('PGUSER', 'eoapi'),
166+
password=os.getenv('PGPASSWORD', '')
167+
)
168+
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
169+
except psycopg2.Error as e:
170+
pytest.skip(f"Cannot connect to database: {e}")
171+
172+
# Insert test item and check for CloudEvent
173+
test_item_id = f"e2e-test-{int(time.time())}"
174+
try:
175+
with conn.cursor() as cursor:
176+
cursor.execute("SELECT pgstac.create_item(%s);", (json.dumps({
177+
"id": test_item_id,
178+
"type": "Feature",
179+
"stac_version": "1.0.0",
180+
"collection": "noaa-emergency-response",
181+
"geometry": {"type": "Point", "coordinates": [0, 0]},
182+
"bbox": [0, 0, 0, 0],
183+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
184+
"assets": {}
185+
}),))
186+
187+
# Check CloudEvents sink logs for CloudEvent
188+
found_event = False
189+
for _ in range(20): # 20 second timeout
190+
time.sleep(1)
191+
result = subprocess.run(['kubectl', 'logs', sink_pod, '--since=30s'], capture_output=True, text=True)
192+
if result.returncode == 0 and "CloudEvent received" in result.stdout and test_item_id in result.stdout:
193+
found_event = True
194+
break
195+
196+
assert found_event, f"CloudEvent for {test_item_id} not received by CloudEvents sink"
197+
198+
finally:
199+
# Cleanup
200+
with conn.cursor() as cursor:
201+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
202+
conn.close()
203+
204+
205+
def test_k_sink_injection():
206+
"""Test that SinkBinding injects K_SINK into eoapi-notifier deployment."""
207+
# Check if eoapi-notifier deployment exists
208+
result = subprocess.run([
209+
'kubectl', 'get', 'deployment',
210+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
211+
'-o', 'jsonpath={.items[0].spec.template.spec.containers[0].env[?(@.name=="K_SINK")].value}'
212+
], capture_output=True, text=True)
213+
214+
if result.returncode != 0:
215+
pytest.skip("eoapi-notifier deployment not found")
216+
217+
k_sink_value = result.stdout.strip()
218+
if k_sink_value:
219+
assert "cloudevents-sink" in k_sink_value, f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}"
220+
print(f"✅ K_SINK properly injected: {k_sink_value}")
221+
else:
222+
# Check if SinkBinding exists - it may take time to inject
223+
sinkbinding_result = subprocess.run([
224+
'kubectl', 'get', 'sinkbinding',
225+
'-l', 'app.kubernetes.io/component=sink-binding',
226+
'--no-headers'
227+
], capture_output=True, text=True)
228+
229+
if sinkbinding_result.returncode == 0 and sinkbinding_result.stdout.strip():
230+
pytest.skip("SinkBinding exists but K_SINK not yet injected - may need more time")
231+
else:
232+
pytest.fail("No K_SINK found and no SinkBinding exists")
233+
234+
235+
if __name__ == "__main__":
236+
pytest.main([__file__, "-v"])

charts/eoapi/Chart.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,7 @@ dependencies:
5353
version: 5.7.4
5454
repository: "https://devseed.com/eoapi-k8s/"
5555
condition: postgrescluster.enabled
56+
- name: eoapi-notifier
57+
version: 0.0.3
58+
repository: "oci://ghcr.io/developmentseed/charts"
59+
condition: notifications.enabled
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
{{- if (index .Values "eoapi-notifier").enabled }}
2+
apiVersion: serving.knative.dev/v1
3+
kind: Service
4+
metadata:
5+
name: {{ include "eoapi.fullname" . }}-cloudevents-sink
6+
labels:
7+
{{- include "eoapi.labels" . | nindent 4 }}
8+
app.kubernetes.io/component: cloudevents-sink
9+
spec:
10+
template:
11+
metadata:
12+
annotations:
13+
autoscaling.knative.dev/minScale: "0"
14+
autoscaling.knative.dev/maxScale: "3"
15+
spec:
16+
containers:
17+
- name: cloudevents-receiver
18+
image: python:3.11-slim
19+
command:
20+
- python
21+
- -c
22+
- |
23+
import json
24+
import logging
25+
import os
26+
from http.server import HTTPServer, BaseHTTPRequestHandler
27+
28+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
29+
logger = logging.getLogger(__name__)
30+
31+
class CloudEventHandler(BaseHTTPRequestHandler):
32+
def do_POST(self):
33+
content_length = int(self.headers.get('Content-Length', 0))
34+
post_data = self.rfile.read(content_length)
35+
36+
logger.info(f"📨 CloudEvent received at {self.path}")
37+
38+
# Log CloudEvent headers
39+
ce_headers = {k: v for k, v in self.headers.items() if k.lower().startswith('ce-')}
40+
if ce_headers:
41+
logger.info(f"CloudEvent headers: {ce_headers}")
42+
43+
try:
44+
if post_data:
45+
data = json.loads(post_data.decode('utf-8'))
46+
logger.info(f"CloudEvent data: {json.dumps(data, indent=2)}")
47+
except Exception as e:
48+
logger.info(f"Raw data: {post_data.decode('utf-8', errors='ignore')}")
49+
50+
self.send_response(200)
51+
self.send_header('Content-Type', 'application/json')
52+
self.end_headers()
53+
self.wfile.write(b'{"status": "received"}')
54+
55+
def do_GET(self):
56+
self.send_response(200)
57+
self.send_header('Content-Type', 'application/json')
58+
self.end_headers()
59+
self.wfile.write(b'{"status": "ready", "service": "cloudevents-sink"}')
60+
61+
def log_message(self, format, *args):
62+
pass
63+
64+
if __name__ == '__main__':
65+
port = int(os.environ.get('PORT', '8080'))
66+
server = HTTPServer(('0.0.0.0', port), CloudEventHandler)
67+
logger.info(f"🚀 CloudEvents sink started on port {port}")
68+
server.serve_forever()
69+
ports:
70+
- containerPort: 8080
71+
env:
72+
- name: PORT
73+
value: "8080"
74+
resources:
75+
requests:
76+
cpu: 50m
77+
memory: 64Mi
78+
limits:
79+
cpu: 200m
80+
memory: 128Mi
81+
---
82+
{{- if (index .Values "eoapi-notifier").enabled }}
83+
apiVersion: sources.knative.dev/v1
84+
kind: SinkBinding
85+
metadata:
86+
name: {{ include "eoapi.fullname" . }}-notifier-binding
87+
labels:
88+
{{- include "eoapi.labels" . | nindent 4 }}
89+
app.kubernetes.io/component: sink-binding
90+
spec:
91+
subject:
92+
apiVersion: apps/v1
93+
kind: Deployment
94+
name: {{ include "eoapi.fullname" . }}-eoapi-notifier
95+
sink:
96+
ref:
97+
apiVersion: serving.knative.dev/v1
98+
kind: Service
99+
name: {{ include "eoapi.fullname" . }}-cloudevents-sink
100+
{{- end }}
101+
{{- end }}

charts/eoapi/templates/pgstacbootstrap/configmap.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ metadata:
1414
data:
1515
pgstac-settings.sql: |
1616
{{ $.Files.Get "initdb-data/settings/pgstac-settings.sql" | nindent 4 }}
17-
{{- if .Values.notifications.sources.pgstac }}
17+
{{- if (index .Values "eoapi-notifier").sources.pgstac }}
1818
{{ $.Files.Get "initdb-data/settings/pgstac-notification-triggers.sql" | nindent 4 }}
1919
{{- end }}
2020
---

0 commit comments

Comments
 (0)