Skip to content

Commit 850c139

Browse files
committed
Feature - log search query to OpenSearch Dashboard in real time
1 parent 296fc4c commit 850c139

File tree

2 files changed

+159
-8
lines changed

2 files changed

+159
-8
lines changed

deployment/lambda-search/app.py

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
from requests_aws4auth import AWS4Auth
1010

1111
from filter_builder import *
12+
from dashboard import *
1213

1314
#Global variables for prod
1415
region = environ['MY_AWS_REGION']
1516
aos_host = environ['OS_ENDPOINT']
1617
sagemaker_endpoint = environ['SAGEMAKER_ENDPOINT']
1718
os_secret_id = environ['OS_SECRET_ID']
1819
model_name = environ['MODEL_NAME']
20+
search_index_name = environ['NEW_INDEX_NAME']
1921

2022
def get_awsauth_from_secret(region, secret_id):
2123
"""
@@ -57,7 +59,7 @@ def invoke_sagemaker_endpoint(sagemaker_endpoint, payload, region):
5759
print(f"Error invoking SageMaker endpoint {sagemaker_endpoint}: {e}")
5860

5961

60-
def semantic_search_neighbors(lang, search_text, features, os_client, sort_param, k_neighbors=30, from_param=0, idx_name=model_name, filters=None, size=10):
62+
def semantic_search_neighbors(lang, search_text, features, os_client, sort_param, k_neighbors=50, from_param=0, idx_name=model_name, filters=None, size=10):
6163
"""
6264
Perform semantic search and get neighbots using the cosine similarity of the vectors
6365
output: a list of json, each json contains _id, _score, title, and uuid
@@ -97,6 +99,8 @@ def semantic_search_neighbors(lang, search_text, features, os_client, sort_param
9799
request_timeout=55,
98100
index=idx_name,
99101
body=query)
102+
103+
print(res)
100104

101105
# # Return a dataframe of the searched results, including title and uuid
102106
# query_result = [
@@ -159,7 +163,7 @@ def add_to_top_of_dict(original_dict, key, value):
159163
new_dict.update(original_dict)
160164

161165
return new_dict
162-
166+
"""
163167
def create_api_response(search_results):
164168
response = {
165169
"total_hits": len(search_results['hits']['hits']),
@@ -176,7 +180,7 @@ def create_api_response(search_results):
176180
except Exception as e:
177181
print(f"Error processing hit: {e}")
178182
return response
179-
183+
"""
180184
def create_api_response_geojson(search_results, lang):
181185

182186
total_hits = search_results['hits']['total']['value'] if 'total' in search_results['hits'] else 0
@@ -256,7 +260,12 @@ def lambda_handler(event, context):
256260
/postText: Uses semantic search to find similar records based on vector similarity.
257261
Other paths: Uses a direct keyword text match to find matched records .
258262
"""
259-
awsauth = get_awsauth_from_secret(region, secret_id=os_secret_id)
263+
#awsauth = get_awsauth_from_secret(region, secret_id=os_secret_id)
264+
#print(awsauth)
265+
266+
credentials = boto3.Session().get_credentials()
267+
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, 'es', session_token=credentials.token)
268+
260269
os_client = OpenSearch(
261270
hosts=[{'host': aos_host, 'port': 443}],
262271
http_auth=awsauth,
@@ -268,10 +277,12 @@ def lambda_handler(event, context):
268277
#print(event)
269278

270279
k = 10
271-
payload = event['q']
280+
payload = event.get('q', '') or ''
272281

273282
# Debug event
274283
#print("event", event)
284+
285+
275286

276287
filter_config = load_config()
277288

@@ -348,12 +359,55 @@ def lambda_handler(event, context):
348359
filters.append(build_spatial_filter(spatial_field, spatial_filter, relation))
349360

350361
# Sort param
351-
sort_param = build_sort_filter(sort_field=sort_param, sort_order=order_param)
362+
sort_param_final = build_sort_filter(sort_field=sort_param, sort_order=order_param)
352363

353364
# If no filters are specified, set filters to None
354365
filters = filters if filters else None
355366

356367
#print("filters : ", filters)
368+
369+
####
370+
#OpenSearch DashBoard code
371+
####
372+
ip_address = event.get('ip_address', '') or ''
373+
timestamp = event.get('timestamp', '') or ''
374+
user_agent = event.get('user_agent', '') or ''
375+
http_method = event.get('http_method', '') or ''
376+
377+
create_opensearch_index(os_client, search_index_name)
378+
ip2geo_data = {}
379+
ip2geo_data = ip2geo_handler(os_client, ip_address)
380+
document = [
381+
{
382+
"timestamp": timestamp,
383+
"lang": lang_filter,
384+
"q": payload,
385+
"ip_address": ip_address,
386+
"user_agent": user_agent,
387+
"http_method": http_method,
388+
"sort_param": sort_param,
389+
"order_param": order_param,
390+
"organization_filter": organization_filter,
391+
"metadata_source_filter": metadata_source_filter,
392+
"theme_filter": theme_filter,
393+
"type_filter": type_filter,
394+
#"start_date_filter": start_date_filter,
395+
#"end_date_filter": end_date_filter,
396+
#"spatial_filter": spatial_filter,
397+
"relation": relation,
398+
"size": size,
399+
"ip2geo": ip2geo_data
400+
}
401+
]
402+
403+
print(f"Document to be indexed: {document}")
404+
405+
save_to_opensearch(os_client, search_index_name, document)
406+
407+
### End of OpenSearch DashBoard code
408+
409+
if event['method'] == 'postText':
410+
payload = json.loads(event['body'])['text']
357411

358412
if event['method'] == 'SemanticSearch':
359413
#print(f'This is payload {payload}')
@@ -367,11 +421,11 @@ def lambda_handler(event, context):
367421
search_text=payload,
368422
features=features,
369423
os_client=os_client,
370-
k_neighbors=10,
424+
k_neighbors=k,
371425
from_param=from_param,
372426
idx_name=model_name,
373427
filters=filters,
374-
sort_param=sort_param,
428+
sort_param=sort_param_final,
375429
size=size
376430
)
377431

deployment/lambda-search/dashboard.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import json
2+
3+
def parse_geo_point(ip2geo_data):
4+
if 'location' in ip2geo_data and isinstance(ip2geo_data['location'], str):
5+
try:
6+
lat, lon = map(float, ip2geo_data['location'].split(','))
7+
ip2geo_data['location'] = {"lat": lat, "lon": lon} # Convert to geo_point format
8+
except ValueError:
9+
print("Invalid location format:", ip2geo_data['location'])
10+
ip2geo_data['location'] = None # Handle errors gracefully
11+
return ip2geo_data
12+
13+
def ip2geo_handler(os_client, ip_address):
14+
15+
ip2geo_payload = {
16+
"docs": [
17+
{
18+
"_index": "test",
19+
"_id": "1",
20+
"_source": {
21+
"ip": ip_address
22+
}
23+
}
24+
]
25+
}
26+
27+
response = os_client.transport.perform_request(
28+
method="POST",
29+
url="/_ingest/pipeline/ip-to-geo-pipeline/_simulate",
30+
body=json.dumps(ip2geo_payload)
31+
)
32+
33+
ip2geo_data = {}
34+
35+
try:
36+
ip2geo_data = response["docs"][0]["doc"]["_source"].get("ip2geo", {})
37+
ip2geo_data = parse_geo_point(ip2geo_data) #ensure lat lon is a geo_point
38+
except (KeyError, json.JSONDecodeError) as e:
39+
print("Error extracting ip2geo data:", str(e))
40+
41+
return ip2geo_data
42+
43+
44+
def create_opensearch_index(os_client, index_name):
45+
"""Create a new OpenSearch index if it doesn't exist."""
46+
if not os_client.indices.exists(index=index_name):
47+
# Define the mapping for the new index
48+
index_body = {
49+
"mappings": {
50+
"properties": {
51+
"timestamp": {"type": "date"},
52+
"lang": {"type": "keyword"},
53+
"id": {"type": "keyword"},
54+
"q": {"type": "keyword"},
55+
"ip_address": {"type": "ip"},
56+
"user_agent": {"type": "keyword"},
57+
"http_method": {"type": "keyword"},
58+
"sort_param": {"type": "keyword"},
59+
"order_param": {"type": "keyword"},
60+
"organization_filter": {"type": "keyword"},
61+
"metadata_source_filter": {"type": "keyword"},
62+
"theme_filter": {"type": "keyword"},
63+
"type_filter": {"type": "keyword"},
64+
"start_date_filter": {"type": "date", "null_value": "1970-01-01T00:00:00.000Z"},
65+
"end_date_filter": {"type": "date", "null_value": "1970-01-01T00:00:00.000Z"},
66+
"spatial_filter": {"type": "geo_shape"},
67+
"relation": {"type": "keyword"},
68+
"size": {"type": "keyword"},
69+
"ip2geo": {
70+
"properties": {
71+
"continent_name": {"type": "keyword"},
72+
"region_iso_code": {"type": "keyword"},
73+
"city_name": {"type": "keyword"},
74+
"country_iso_code": {"type": "keyword"},
75+
"country_name": {"type": "keyword"},
76+
"region_name": {"type": "keyword"},
77+
"location": {"type": "geo_point"},
78+
"time_zone": {"type": "keyword"}
79+
}
80+
}
81+
}
82+
}
83+
}
84+
85+
response = os_client.indices.create(index=index_name, body=index_body)
86+
print(f"Created new OpenSearch index: {index_name}")
87+
return response
88+
else:
89+
print(f"Index '{index_name}' already exists.")
90+
return None
91+
92+
def save_to_opensearch(os_client, index, document):
93+
"""
94+
Loads the transformed log data into OpenSearch.
95+
"""
96+
for doc in document:
97+
response = os_client.index(index=index, body=doc)

0 commit comments

Comments
 (0)