From dd4a5ba5fd51785874cb3c7c442e14a53b57f024 Mon Sep 17 00:00:00 2001 From: nitinaggarwal-databricks Date: Tue, 16 Sep 2025 14:02:02 +0000 Subject: [PATCH 01/12] Added the agent genie app source code --- .../bundle/dev/vscode.bundlevars.json | 1 + .../bundle/dev/vscode.overrides.json | 1 + agent-genie-app/agent_genie/app.py | 156 ++ agent-genie-app/agent_genie/app.yaml | 15 + agent-genie-app/agent_genie/databricks.yml | 95 + agent-genie-app/agent_genie/helper.py | 381 +++ .../agent_genie/manual_ai_content.py | 227 ++ agent-genie-app/agent_genie/requirements.txt | 20 + .../agent_genie/table_extraction.py | 137 + .../agent_genie/templates/index.html | 2296 +++++++++++++++++ agent-genie-app/agent_genie/tracking.py | 584 +++++ agent-genie-app/manifest.mf | 1 + .../bundle/dev/vscode.bundlevars.json | 1 + .../bundle/dev/vscode.overrides.json | 1 + agent_genie/CODEOWNERS | 22 + agent_genie/app.py | 156 ++ agent_genie/app.yaml | 15 + agent_genie/databricks.yml | 95 + agent_genie/helper.py | 381 +++ agent_genie/manual_ai_content.py | 227 ++ agent_genie/requirements.txt | 20 + agent_genie/table_extraction.py | 137 + agent_genie/templates/index.html | 2296 +++++++++++++++++ agent_genie/tracking.py | 584 +++++ manifest.mf | 1 + 25 files changed, 7850 insertions(+) create mode 100644 agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json create mode 100644 agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.overrides.json create mode 100644 agent-genie-app/agent_genie/app.py create mode 100644 agent-genie-app/agent_genie/app.yaml create mode 100644 agent-genie-app/agent_genie/databricks.yml create mode 100644 agent-genie-app/agent_genie/helper.py create mode 100644 agent-genie-app/agent_genie/manual_ai_content.py create mode 100644 agent-genie-app/agent_genie/requirements.txt create mode 100644 agent-genie-app/agent_genie/table_extraction.py create mode 100644 agent-genie-app/agent_genie/templates/index.html create mode 100644 agent-genie-app/agent_genie/tracking.py create mode 100644 agent-genie-app/manifest.mf create mode 100644 agent_genie/.databricks/bundle/dev/vscode.bundlevars.json create mode 100644 agent_genie/.databricks/bundle/dev/vscode.overrides.json create mode 100644 agent_genie/CODEOWNERS create mode 100644 agent_genie/app.py create mode 100644 agent_genie/app.yaml create mode 100644 agent_genie/databricks.yml create mode 100644 agent_genie/helper.py create mode 100644 agent_genie/manual_ai_content.py create mode 100644 agent_genie/requirements.txt create mode 100644 agent_genie/table_extraction.py create mode 100644 agent_genie/templates/index.html create mode 100644 agent_genie/tracking.py create mode 100644 manifest.mf diff --git a/agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json b/agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.overrides.json b/agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.overrides.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agent-genie-app/agent_genie/.databricks/bundle/dev/vscode.overrides.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agent-genie-app/agent_genie/app.py b/agent-genie-app/agent_genie/app.py new file mode 100644 index 00000000..10576678 --- /dev/null +++ b/agent-genie-app/agent_genie/app.py @@ -0,0 +1,156 @@ +import os +import json +import requests +from flask import Flask, render_template, request, jsonify +from databricks.sdk.core import Config +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ChatMessage, ChatMessageRole + +# Initialize Flask app +app = Flask(__name__, static_folder='static', template_folder='static') + +cfg = Config() +client = WorkspaceClient() + +# Model serving endpoint name +ENDPOINT_NAME = "agents_ws_vfc_demo-sch_vfc_demo-fashion_merchandising_agent" + +@app.route('/') +def index(): + """Serve the main HTML page""" + return render_template('index.html') + +@app.route('/api/chat', methods=['POST']) +def chat(): + """Handle chat requests and forward to model serving endpoint using Databricks SDK""" + try: + # Get the message history from the request + data = request.get_json() + messages = data.get('messages', []) + + if not messages: + return jsonify({'error': 'No messages provided'}), 400 + + # Convert messages to ChatMessage objects + chat_messages = [] + for msg in messages: + role = msg.get('role', 'user') + content = msg.get('content', '') + + # Map roles to ChatMessageRole enum + if role == 'user': + chat_role = ChatMessageRole.USER + elif role == 'assistant': + chat_role = ChatMessageRole.ASSISTANT + elif role == 'system': + chat_role = ChatMessageRole.SYSTEM + else: + chat_role = ChatMessageRole.USER # Default to user + + chat_messages.append(ChatMessage(role=chat_role, content=content)) + + # Get user information for logging + user_email = request.headers.get('X-Forwarded-Email') + app.logger.info(f"Making request to model endpoint for user: {user_email}") + + # Make request to model serving endpoint using Databricks SDK + response = client.serving_endpoints.query( + name=ENDPOINT_NAME, + messages=chat_messages + ) + + # Extract the response content + if response and hasattr(response, 'choices') and response.choices: + # Get the first choice + choice = response.choices[0] + if hasattr(choice, 'message') and choice.message: + result_content = choice.message.content + return jsonify({'content': result_content}) + else: + return jsonify({'error': 'No message content in response'}), 500 + else: + return jsonify({'error': 'No response choices received'}), 500 + + except Exception as e: + error_msg = str(e) + app.logger.error(f"Error calling model endpoint: {error_msg}") + + # Handle specific error types + if "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower(): + return jsonify({ + 'error': 'Authentication failed. Your Databricks token may have expired.', + 'details': 'Please refresh the page or log in again.' + }), 401 + elif "permission" in error_msg.lower() or "forbidden" in error_msg.lower(): + return jsonify({ + 'error': 'Access denied. You may not have permission to access this model endpoint.', + 'details': 'Please contact your Databricks administrator.' + }), 403 + elif "timeout" in error_msg.lower(): + return jsonify({'error': 'Request timed out. Please try again.'}), 504 + else: + return jsonify({ + 'error': f'Unexpected error: {error_msg}', + 'details': 'Please try again or contact support if the issue persists.' + }), 500 + +@app.route('/health') +def health(): + """Health check endpoint""" + user_email = request.headers.get('X-Forwarded-Email') + + try: + # Test the client connection + client.current_user.me() + client_healthy = True + except Exception as e: + client_healthy = False + app.logger.error(f"Client health check failed: {str(e)}") + + return jsonify({ + 'status': 'healthy', + 'client_authenticated': client_healthy, + 'user_email': user_email if user_email else 'anonymous', + 'endpoint_name': ENDPOINT_NAME + }) + +@app.route('/api/debug') +def debug(): + """Debug endpoint to check authentication and endpoint status""" + headers_info = { + 'X-Forwarded-Access-Token': 'present' if request.headers.get('X-Forwarded-Access-Token') else 'missing', + 'X-Forwarded-Email': request.headers.get('X-Forwarded-Email', 'not provided'), + 'User-Agent': request.headers.get('User-Agent', 'not provided'), + 'Authorization': 'present' if request.headers.get('Authorization') else 'missing' + } + + try: + # Check if we can access the serving endpoint + endpoint_info = client.serving_endpoints.get(name=ENDPOINT_NAME) + endpoint_status = endpoint_info.state.value if endpoint_info.state else 'unknown' + except Exception as e: + endpoint_status = f'error: {str(e)}' + + try: + # Check current user + current_user = client.current_user.me() + user_info = current_user.user_name if current_user else 'unknown' + except Exception as e: + user_info = f'error: {str(e)}' + + return jsonify({ + 'message': 'Debug information for Databricks App', + 'headers': headers_info, + 'endpoint_name': ENDPOINT_NAME, + 'endpoint_status': endpoint_status, + 'current_user': user_info, + 'sdk_config': { + 'host': cfg.host if hasattr(cfg, 'host') else 'not set', + 'auth_type': cfg.auth_type if hasattr(cfg, 'auth_type') else 'not set' + } + }) + +if __name__ == '__main__': + # Get port from environment variable or default to 8080 + port = int(os.environ.get('PORT', 8080)) + app.run(host='0.0.0.0', port=port, debug=False) \ No newline at end of file diff --git a/agent-genie-app/agent_genie/app.yaml b/agent-genie-app/agent_genie/app.yaml new file mode 100644 index 00000000..aebe8371 --- /dev/null +++ b/agent-genie-app/agent_genie/app.yaml @@ -0,0 +1,15 @@ +command: [ + "uvicorn", + "app:app", + "--host", + "127.0.0.1", + "--port", + "8000" +] +env: + - name: "SPACE_ID" + value: "01f03de6df76113387ccc36242e6c804" + - name: "SERVING_ENDPOINT_NAME" + value: "databricks-gpt-oss-120b" + - name: "TAVILY_API_KEY" + value: "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" \ No newline at end of file diff --git a/agent-genie-app/agent_genie/databricks.yml b/agent-genie-app/agent_genie/databricks.yml new file mode 100644 index 00000000..cff7fcf8 --- /dev/null +++ b/agent-genie-app/agent_genie/databricks.yml @@ -0,0 +1,95 @@ +bundle: + name: agent-genie-app + +variables: + project_name: + description: "Display name for the app" + default: "agent-genie" + + # Env-only fallback; will also be persisted to a secret by the installer job if possible + tavily_api_key: + description: "Tavily API key (installer job will try to store in a secret scope)" + default: "" + + # Where to persist the secret (customize if you already have a scope) + secret_scope: + description: "Workspace secret scope to store Tavily key" + default: "agent_genie_secrets" + + secret_key: + description: "Secret key name for Tavily key inside the scope" + default: "TAVILY_API_KEY" + + environment: + description: "Deployment environment (dev, staging, prod)" + default: "dev" + +targets: + dev: + default: true + mode: development + +resources: + apps: + agent_genie: + name: "${var.project_name}" + description: "FastAPI app with Genie + Serving Endpoint integration" + source_code_path: "." + + command: [ + "uvicorn", + "app:app", + "--host", + "127.0.0.1", + "--port", + "8000" + ] + + # Inject env for your app at runtime + env: + - name: "SPACE_ID" + value_from: "genie-space" + - name: "SERVING_ENDPOINT_NAME" + value_from: "serving-endpoint" + - name: "TAVILY_API_KEY" + value: "${var.tavily_api_key}" # app can use this directly; secret is optional hardening + + # --- App Resources (end-user picks these at install) --- + app_resources: + - key: "serving-endpoint" + serving_endpoint_spec: + permission: "CAN_QUERY" + - key: "genie-space" + genie_space_spec: + permission: "CAN_RUN" + + # --- User Authorized Scopes (Preview) --- + user_authorized_scopes: + - "sql" + - "dashboards.genie" + - "files.files" + - "serving.serving-endpoints" + - "vectorsearch.vector-search-indexes" + - "catalog.connections" + + # --- Installer job to persist Tavily key into a secret and write optional config --- + jobs: + install_app: + name: "${var.project_name} - Install/Configure" + tasks: + - task_key: configure_app + notebook_task: + notebook_path: "./notebooks/setup_app" # create this notebook + base_parameters: + TAVILY_API_KEY: "${var.tavily_api_key}" + SECRET_SCOPE: "${var.secret_scope}" + SECRET_KEY: "${var.secret_key}" + # Add compute for your workspace (example placeholders): + # existing_cluster_id: "" + # OR: + # job_clusters: + # - job_cluster_key: "install_cluster" + # new_cluster: + # spark_version: "14.3.x-scala2.12" + # node_type_id: "i3.xlarge" + # num_workers: 0 diff --git a/agent-genie-app/agent_genie/helper.py b/agent-genie-app/agent_genie/helper.py new file mode 100644 index 00000000..abcb87c1 --- /dev/null +++ b/agent-genie-app/agent_genie/helper.py @@ -0,0 +1,381 @@ +from __future__ import annotations + +# --- Optional/lazy imports for external tools --- +try: + from langchain_tavily import TavilySearch # optional +except Exception: # pragma: no cover + TavilySearch = None # type: ignore + +try: + from langchain_community.document_loaders import WebBaseLoader # optional +except Exception: # pragma: no cover + WebBaseLoader = None # type: ignore + +import os +import aiohttp +import json +import asyncio +import logging +import ssl +import time +import hashlib +import re as _re +from typing import Dict, Any, Optional, List, Union, Tuple + +import requests +import backoff +import pandas as pd + +from databricks.sdk.core import Config + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("helper") + + +# ----------------------------------------------------------------------------- +# Databricks OAuth config (single source of truth) +# ----------------------------------------------------------------------------- +_CFG = Config() # reads workspace & OAuth app from env/metadata +DATABRICKS_HOST: str = _CFG.hostname # hostname (no protocol) + +import os +from typing import List, Dict, Tuple +from langchain_community.utilities.tavily_search import TavilySearchAPIWrapper + +#os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY") # don't hardcode in code +def tavily_topk_contents(question: str, k: int = 3) -> Tuple[List[Dict], str]: + """ + Return (top-k results, combined content). If no API key or any error, returns ([], ""). + """ + if not os.getenv("TAVILY_API_KEY"): + return [], "" # no key -> no internet context + + tavily = TavilySearchAPIWrapper() + try: + print("Working with tavily, got the api key") + results = tavily.results(query=question, max_results=k) + # Normalize shapes + if isinstance(results, dict) and "results" in results: + results = results["results"] + results = (results or [])[:k] + + combined = "\n\n".join( + r.get("content", "").strip() for r in results if r.get("content") + ).strip() + return results, combined + except Exception: + # Network/quotas/shape issues -> fallback + return [], "" + + +def _get_oauth_token() -> str: + """Fetch a *fresh* OAuth token every time to avoid staleness.""" + try: + return _CFG.oauth_token().access_token + except Exception as e: # pragma: no cover + logger.exception("Failed to mint OAuth token: %s", e) + raise + + +def auth_headers() -> dict: + """Return headers using a *fresh* OAuth token (ignore any legacy tokens).""" + return { + "Authorization": f"Bearer {_get_oauth_token()}", + "Content-Type": "application/json", + } + + +# ----------------------------------------------------------------------------- +# Async Genie fetch (used by /fetch-schema etc.) +# ----------------------------------------------------------------------------- +_ALLOW_INSECURE = os.getenv("ALLOW_INSECURE_SSL", "false").lower() in ("1", "true", "yes") + + +async def _raw_fetch_answer( + workspace_url: str, + genie_room_id: str, + access_token: Optional[str], + input_text: str, + conversation_id: Optional[str] = None, +) -> dict: + """Asynchronously fetch an answer from Databricks Genie. + + Note: `access_token` is ignored on purpose; we always use fresh OAuth. + """ + if not workspace_url.endswith("/"): + workspace_url += "/" + + headers = auth_headers() + + # Optional insecure SSL (for internal envs); secure by default. + connector = None + if _ALLOW_INSECURE: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector = aiohttp.TCPConnector(ssl=ssl_context) + + max_query_retries = 3 + query_retry_count = 0 + + async with aiohttp.ClientSession(connector=connector) as session: + try: + if conversation_id: + logger.info("Continuing conversation: %s", conversation_id) + continue_url = ( + f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/conversations/{conversation_id}/messages" + ) + payload_continue = {"content": input_text} + async with session.post(continue_url, json=payload_continue, headers=headers) as resp: + if resp.status != 200: + txt = await resp.text() + logger.error("Error continuing conversation: %s - %s", resp.status, txt) + return {"error": f"API error: {resp.status} - {txt}", "conversation_id": conversation_id} + response_json = await resp.json() + if "message_id" not in response_json: + logger.error("Unexpected response (missing message_id): %s", response_json) + return {"error": "Unexpected response format from API", "conversation_id": conversation_id} + message_id = response_json["message_id"] + else: + start_conv_url = f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/start-conversation" + payload_start = {"content": input_text} + async with session.post(start_conv_url, json=payload_start, headers=headers) as resp: + if resp.status != 200: + txt = await resp.text() + logger.error("Error starting conversation: %s - %s", resp.status, txt) + return {"error": f"API error: {resp.status} - {txt}"} + response_json = await resp.json() + + if "message_id" not in response_json: + logger.error("Unexpected response (missing message_id): %s", response_json) + return {"error": "Unexpected response format from API"} + message_id = response_json["message_id"] + if "message" in response_json and "conversation_id" in response_json["message"]: + conversation_id = response_json["message"]["conversation_id"] + elif "conversation_id" in response_json: + conversation_id = response_json["conversation_id"] + else: + logger.error("Missing conversation_id in API response: %s", response_json) + return {"error": "Missing conversation_id in API response"} + + logger.info("Conversation ID: %s, Message ID: %s", conversation_id, message_id) + + while query_retry_count < max_query_retries: + if query_retry_count: + logger.info("Retry attempt %d/%d", query_retry_count + 1, max_query_retries) + poll_url = ( + f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/conversations/" + f"{conversation_id}/messages/{message_id}" + ) + max_polls = 300 # ~4-10 min depending on sleep + retry_interval = 2 + + for attempt in range(max_polls): + async with session.get(poll_url, headers=headers) as pres: + if pres.status != 200: + et = await pres.text() + logger.error("Polling error: %s - %s", pres.status, et) + await asyncio.sleep(retry_interval) + continue + poll_json = await pres.json() + + if poll_json.get("attachments"): + attachment = poll_json["attachments"][0] + attachment_id = attachment.get("attachment_id") + + # Extract SQL query from the attachment if available + sql_query = None + if "query" in attachment: + query_info = attachment.get("query", {}) + if isinstance(query_info, dict): + sql_query = query_info.get("query", "") + else: + sql_query = str(query_info) + logger.info(f"πŸ” Extracted SQL query from attachment: {sql_query[:100] if sql_query else 'None'}...") + + answer_url = ( + f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/conversations/" + f"{conversation_id}/messages/{message_id}/attachments/{attachment_id}/query-result" + ) + async with session.get(answer_url, headers=headers) as r2: + if r2.status != 200: + et = await r2.text() + logger.error("Result fetch error: %s - %s", r2.status, et) + break + result_json = await r2.json() + + state = result_json.get("statement_response", {}).get("status", {}).get("state") + if state == "SUCCEEDED": + logger.info("Query succeeded") + result_json["conversation_id"] = conversation_id + # Include the SQL query in the result + if sql_query and sql_query.strip(): + result_json["sql_query"] = sql_query.strip() + logger.info(f"βœ… Added SQL query to result: {sql_query[:100]}...") + return result_json + elif state == "FAILED": + logger.error("Query failed (attempt %d/%d)", query_retry_count + 1, max_query_retries) + if query_retry_count < max_query_retries - 1: + query_retry_count += 1 + await asyncio.sleep(2) + break + else: + return { + "error": "Query failed to execute after multiple attempts.", + "conversation_id": conversation_id, + "attempts": query_retry_count + 1, + } + # else keep polling + + if attempt % 10 == 0: + logger.info("Polling attempt %d/%d", attempt + 1, max_polls) + await asyncio.sleep(retry_interval) + + if query_retry_count < max_query_retries - 1: + query_retry_count += 1 + else: + break + + logger.warning("Failed to get result after %d attempts", query_retry_count + 1) + return { + "error": f"Could not get query result after {query_retry_count + 1} attempts.", + "conversation_id": conversation_id, + "attempts": query_retry_count + 1, + } + + except aiohttp.ClientError as e: + logger.exception("HTTP error in fetch_answer: %s", e) + return {"error": f"Network error: {str(e)}", "conversation_id": conversation_id} + except Exception as e: + logger.exception("Unexpected error in fetch_answer: %s", e) + return {"error": f"Unexpected error: {str(e)}", "conversation_id": conversation_id} + + +# -------------------- +# Lightweight TTL cache for fetch_answer +# -------------------- +_ANSWER_CACHE_TTL = int(os.getenv("ANSWER_CACHE_TTL", "300")) # seconds +_ANSWER_CACHE_MAX = int(os.getenv("ANSWER_CACHE_MAX", "512")) # entries +_ANSWER_CACHE: Dict[str, Tuple[float, dict]] = {} +_ANSWER_CACHE_LOCK = asyncio.Lock() + + +def _normalize_q(q: str) -> str: + try: + q = q.strip() + q = _re.sub(r"\s+", " ", q) + return q[:4096] + except Exception: + return q + + +def _answer_cache_key(workspace_url: str, genie_room_id: str, input_text: str, conversation_id: Optional[str]): + base = f"{workspace_url}|{genie_room_id}|{_normalize_q(input_text)}|{conversation_id or ''}" + return hashlib.sha256(base.encode("utf-8")).hexdigest() + + +async def fetch_answer(workspace_url, genie_room_id, access_token, input_text, conversation_id=None): + """Cached wrapper over _raw_fetch_answer. Set ANSWER_CACHE_TTL/ANSWER_CACHE_MAX via env.""" + key = _answer_cache_key(workspace_url, genie_room_id, input_text, conversation_id) + now = time.time() + # Try cache + try: + async with _ANSWER_CACHE_LOCK: + entry = _ANSWER_CACHE.get(key) + if entry and (now - entry[0]) < _ANSWER_CACHE_TTL: + return entry[1] + except Exception: + pass + + data = await _raw_fetch_answer(workspace_url, genie_room_id, access_token, input_text, conversation_id) + + # Avoid caching obvious auth failures + try: + if isinstance(data, dict) and "error" in data and "Invalid Token" in str(data.get("error", "")): + return data + except Exception: + pass + + # Extract SQL query from the response and add it to the data + if isinstance(data, dict) and "error" not in data: + sql_query = extract_sql_from_response(data) + if sql_query: + data["sql_query"] = sql_query + + # Store + try: + async with _ANSWER_CACHE_LOCK: + if len(_ANSWER_CACHE) >= _ANSWER_CACHE_MAX: + try: + oldest_key = next(iter(_ANSWER_CACHE)) + _ANSWER_CACHE.pop(oldest_key, None) + except Exception: + _ANSWER_CACHE.clear() + _ANSWER_CACHE[key] = (time.time(), data) + except Exception: + pass + + return data + + + +def extract_sql_from_response(response_data: dict) -> Optional[str]: + """Extract SQL query from Genie API response data.""" + try: + # Log the response structure for debugging + logger.info(f"πŸ” Debugging response structure: {list(response_data.keys()) if isinstance(response_data, dict) else 'Not a dict'}") + + # Check if response has statement_response with query + statement_response = response_data.get("statement_response", {}) + if statement_response: + logger.info(f"πŸ” Found statement_response: {list(statement_response.keys())}") + query = statement_response.get("query", "") + if query and query.strip(): + logger.info(f"βœ… Found SQL query in statement_response: {query[:100]}...") + return query.strip() + + # Check if response has attachments with query information + attachments = response_data.get("attachments", []) + if attachments: + logger.info(f"πŸ” Found attachments: {len(attachments)} items") + for i, attachment in enumerate(attachments): + logger.info(f"πŸ” Attachment {i}: {list(attachment.keys()) if isinstance(attachment, dict) else 'Not a dict'}") + if "query" in attachment: + query_info = attachment.get("query", {}) + logger.info(f"πŸ” Found query in attachment: {list(query_info.keys()) if isinstance(query_info, dict) else query_info}") + if isinstance(query_info, dict): + query_text = query_info.get("query", "") + else: + query_text = str(query_info) + if query_text and query_text.strip(): + logger.info(f"βœ… Found SQL query in attachments: {query_text[:100]}...") + return query_text.strip() + + # Check for SQL in result_data if available + result_data = response_data.get("result_data", {}) + if result_data: + logger.info(f"πŸ” Found result_data: {list(result_data.keys())}") + if "query" in result_data: + query_text = result_data.get("query", "") + if query_text and query_text.strip(): + logger.info(f"βœ… Found SQL query in result_data: {query_text[:100]}...") + return query_text.strip() + + # Additional check for direct query field + if "query" in response_data: + query_text = response_data.get("query", "") + if query_text and query_text.strip(): + logger.info(f"βœ… Found SQL query in root: {query_text[:100]}...") + return query_text.strip() + + logger.warning("⚠️ No SQL query found in response") + return None + except Exception as e: + logger.warning(f"Error extracting SQL from response: {str(e)}") + return None diff --git a/agent-genie-app/agent_genie/manual_ai_content.py b/agent-genie-app/agent_genie/manual_ai_content.py new file mode 100644 index 00000000..324cf333 --- /dev/null +++ b/agent-genie-app/agent_genie/manual_ai_content.py @@ -0,0 +1,227 @@ +# Manual content storage for each AI function type +MANUAL_AI_CONTENT = { + "ai_analyze_sentiment": """Syntax + SQL + ai_analyze_sentiment(content) + + Arguments + content: A STRING expression, the text to be analyzed. + Returns + A STRING. The value is chosen from 'positive', 'negative', 'neutral', or 'mixed'. Returns null if the sentiment cannot be detected. + + Examples + SQL + > SELECT ai_analyze_sentiment('I am happy'); + positive + + > SELECT ai_analyze_sentiment('I am sad'); + negative""", + + "ai_classify": """Syntax + ai_classify(content, labels) + + Arguments + content: A STRING expression, the text to be classified. + labels: An ARRAY literal, the expected output classification labels. Must contain at least 2 elements, and no more than 20 elements. + Returns + A STRING. The value matches one of the strings provided in the labels argument. Returns null if the content cannot be classified. + + Examples + SQL + > SELECT ai_classify("My password is leaked.", ARRAY("urgent", "not urgent")); + urgent + + > SELECT + description, + ai_classify(description, ARRAY('clothing', 'shoes', 'accessories', 'furniture')) AS category + FROM + products + ;""", + + "ai_extract": """Syntax + ai_extract(content, labels) + + Arguments + content: A STRING expression. + labels: An ARRAY literal. Each element is a type of entity to be extracted. + Returns + A STRUCT where each field corresponds to an entity type specified in labels. Each field contains a string representing the extracted entity. If more than one candidate for any entity type is found, only one is returned. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_extract( + 'John Doe lives in New York and works for Acme Corp.', + array('person', 'location', 'organization') + ); + {"person": "John Doe", "location": "New York", "organization": "Acme Corp."} + + > SELECT ai_extract( + 'Send an email to jane.doe@example.com about the meeting at 10am.', + array('email', 'time') + ); + {"email": "jane.doe@example.com", "time": "10am"}""", + + "ai_fix_grammar": """ + Syntax + SQL + ai_fix_grammar(content) + + Arguments + content: A STRING expression. + Returns + A STRING with corrected grammar. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_fix_grammar('This sentence have some mistake'); + "This sentence has some mistakes" + + > SELECT ai_fix_grammar('She dont know what to did.'); + "She doesn't know what to do." + + > SELECT ai_fix_grammar('He go to school every days.'); + "He goes to school every day." + """, + + "ai_gen": """Syntax + ai_gen(prompt) + + Arguments + prompt: A STRING expression. + Returns + A STRING. + + Examples + SQL + > SELECT ai_gen('Generate a concise, cheerful email title for a summer bike sale with 20% discount'); + Summer Bike Sale: Grab Your Dream Bike at 20% Off! + + > SELECT + question, + ai_gen( + 'You are a teacher. Answer the students question in 50 words: ' || question + ) AS answer + FROM + questions + ; + + """, + "ai_mask": """Syntax + ai_mask(content, labels) + + Arguments + content: A STRING expression. + labels: An ARRAY literal. Each element represents a type of information to be masked. + Returns + A STRING where the specified information is masked. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_mask( + 'John Doe lives in New York. His email is john.doe@example.com.', + array('person', 'email') + ); + "[MASKED] lives in New York. His email is [MASKED]." + + > SELECT ai_mask( + 'Contact me at 555-1234 or visit us at 123 Main St.', + array('phone', 'address') + ); + "Contact me at [MASKED] or visit at [MASKED]""", + # "ai_parse_document": "", + "ai_similarity": """Syntax + ai_similarity(expr1, expr2) + + Arguments + expr1: A STRING expression. + expr2: A STRING expression. + Returns + A FLOAT value, representing the semantic similarity between the two input strings. The output score is relative and should only be used for ranking. Score of 1 means the two text are equal. + + Examples + SQL + > SELECT ai_similarity('Apache Spark', 'Apache Spark'); + 1.0 + + > SELECT + company_name + FROM + customers + ORDER BY ai_similarity(company_name, 'Databricks') DESC + ;""", + + "ai_summarize": """Syntax + ai_summarize(content[, max_words]) + + Arguments + content: A STRING expression, the text to be summarized. + max_words: An optional non-negative integral numeric expression representing the best-effort target number of words in the returned summary text. The default value is 50. If set to 0, there is no word limit. + Returns + A STRING. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_summarize( + 'Apache Spark is a unified analytics engine for large-scale data processing. ' || + 'It provides high-level APIs in Java, Scala, Python and R, and an optimized ' || + 'engine that supports general execution graphs. It also supports a rich set ' || + 'of higher-level tools including Spark SQL for SQL and structured data ' || + 'processing, pandas API on Spark for pandas workloads, MLlib for machine ' || + 'learning, GraphX for graph processing, and Structured Streaming for incremental ' || + 'computation and stream processing.', + 20 + ); + "Apache Spark is a unified, multi-language analytics engine for large-scale data processing + with additional tools for SQL, machine learning, graph processing, and stream computing.""", + "ai_translate": """Syntax + SQL + ai_translate(content, to_lang) + + Arguments + content: A STRING expression, the text to be translated. + to_lang: A STRING expression, the target language code to translate the content to. + Returns + A STRING. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_translate('Hello, how are you?', 'es'); + "Hola, ΒΏcΓ³mo estΓ‘s?" + + > SELECT ai_translate('La vida es un hermoso viaje.', 'en'); + "Life is a beautiful journey.""", + "ai_forecast": """Syntax + SQL + + ai_forecast( + observed TABLE, + horizon DATE | TIMESTAMP | STRING, + time_col STRING, + value_col STRING | ARRAY, + group_col STRING | ARRAY | NULL DEFAULT NULL, + prediction_interval_width DOUBLE DEFAULT 0.95, + frequency STRING DEFAULT 'auto', + seed INTEGER | NULL DEFAULT NULL, + parameters STRING DEFAULT '{}' + ) + + ... (truncated for brevity in this message; keep your full content here) + """, + "ai_query": """Syntax + To query an endpoint that serves a foundation model: + + ai_query(endpoint, request) + + ... (truncated for brevity in this message; keep your full content here) + """ # For general predictive queries +} \ No newline at end of file diff --git a/agent-genie-app/agent_genie/requirements.txt b/agent-genie-app/agent_genie/requirements.txt new file mode 100644 index 00000000..77828544 --- /dev/null +++ b/agent-genie-app/agent_genie/requirements.txt @@ -0,0 +1,20 @@ +mlflow +langgraph==0.3.4 +databricks-langchain +databricks-agents +uv +fastapi +uvicorn +openai +databricks-sdk>=0.18.0 +# databricks-vectorsearch +jinja2 +langchain-tavily +langchain_community +beautifulsoup4 +pandas +python-dotenv +openai +backoff +pypdf +cryptography>=3.1 \ No newline at end of file diff --git a/agent-genie-app/agent_genie/table_extraction.py b/agent-genie-app/agent_genie/table_extraction.py new file mode 100644 index 00000000..28ca9c6e --- /dev/null +++ b/agent-genie-app/agent_genie/table_extraction.py @@ -0,0 +1,137 @@ +import requests +from dotenv import load_dotenv +import os +import json + +# Load environment variables +load_dotenv() + +# Get Databricks workspace URL and access token as fallbacks +fallback_workspace_url = os.getenv("WORKSPACE_URL") +fallback_access_token = os.getenv("ACCESS_TOKEN") + +def get_tables(catalog_name, schema_name, workspace_url=None, access_token=None): + """ + Get tables for a given catalog and schema from Databricks SQL + + Args: + catalog_name (str): The name of the catalog + schema_name (str): The name of the schema + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Dictionary with tables information or error + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + # Endpoint for listing tables - using Unity Catalog endpoint + endpoint = f"{current_workspace_url}/api/2.1/unity-catalog/tables" + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Parameters for the request + params = { + "catalog_name": catalog_name, + "schema_name": schema_name, + "max_results": 20, # optional + "omit_columns": True, # optional + "omit_properties": True # optional + } + + # Make the request + response = requests.get(endpoint, headers=headers, params=params) + + # Check if request was successful + if response.status_code == 200: + tables = response.json().get("tables", []) + return { + "success": True, + "tables": tables + } + else: + return { + "success": False, + "error": f"Failed to get tables: {response.text}" + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + +def get_table_columns(catalog_name, schema_name, table_name, workspace_url=None, access_token=None): + """ + Get columns for a specific table from Databricks SQL + + Args: + catalog_name (str): The name of the catalog + schema_name (str): The name of the schema + table_name (str): The name of the table + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Dictionary with column information or error + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + # Endpoint for describing table using Unity Catalog + endpoint = f"{current_workspace_url}/api/2.1/unity-catalog/tables/{catalog_name}.{schema_name}.{table_name}" + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Make the request + response = requests.get(endpoint, headers=headers) + + # Check if request was successful + if response.status_code == 200: + # Extract column information from Unity Catalog response + columns_data = response.json().get("columns", []) + # Extract column names + column_names = [col.get("name") for col in columns_data if col.get("name")] + return { + "success": True, + "columns": column_names + } + else: + return { + "success": False, + "error": f"Failed to get columns: {response.text}" + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + +# Example usage +if __name__ == "__main__": + # Default values + default_catalog = "users_trial" + default_schema = "nitin_aggarwal" + get_tables(default_catalog, default_schema) diff --git a/agent-genie-app/agent_genie/templates/index.html b/agent-genie-app/agent_genie/templates/index.html new file mode 100644 index 00000000..e2d7ceaf --- /dev/null +++ b/agent-genie-app/agent_genie/templates/index.html @@ -0,0 +1,2296 @@ + + + + + + Databricks Genie++ Omni-Analytics Platform + + + + + +
+
+

Databricks Genie++ Omni-Analytics Platform

+
+ +
+ A multi-agent AI engine that answers questions, forecasts trends, classifies data, summarizes records, translates languages, and recommends next-best actions. For any questions or feedback, please reach out to nitin.aggarwal@databricks.com +
+ + + + + +
+
+ Advanced Questions + + + +
+
+
+
+ + + + + +
+
+
+
+ + +
+
+ Suggested Questions + + + +
+
+
+
+ Loading questions based on your data schema... +
+
+
+
+ + +
+
+ πŸ“„ Document Analyzer + + + +
+
+ +
+ + + +
+ + +
+ + +
+
+
+ +
+
+ + +
+
+ +
+
+
+ + + + \ No newline at end of file diff --git a/agent-genie-app/agent_genie/tracking.py b/agent-genie-app/agent_genie/tracking.py new file mode 100644 index 00000000..20a403f5 --- /dev/null +++ b/agent-genie-app/agent_genie/tracking.py @@ -0,0 +1,584 @@ +import requests +import json +import uuid +import logging +from datetime import datetime +from dotenv import load_dotenv +import os + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Get Databricks workspace URL and access token as fallbacks +fallback_workspace_url = os.getenv("WORKSPACE_URL") +fallback_access_token = os.getenv("ACCESS_TOKEN") + +def create_user_interaction_table(catalog_name="user", schema_name="nitin_aggarwal", workspace_url=None, access_token=None): + """ + Creates a table to track user interactions with the genie space + + Args: + catalog_name (str): The name of the catalog (default: "users") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Result of table creation operation + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to create the user interaction tracking table + create_table_sql = f""" + CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.user_interactions ( + interaction_id STRING, + timestamp TIMESTAMP, + user_email STRING, + user_question STRING, + genie_space_id STRING, + ai_function_type STRING, + query_classification STRING, + conversation_id STRING, + required_columns STRING, + workspace_url STRING, + response_type STRING, + is_helpful BOOLEAN, + feedback_reason STRING + ) + USING DELTA + COMMENT 'Tracks user interactions with the Databricks Genie space including feedback' + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": create_table_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info(f"User interactions table created successfully in {catalog_name}.{schema_name}") + return { + "success": True, + "message": f"Table {catalog_name}.{schema_name}.user_interactions created successfully", + "response": response.json() + } + else: + logger.error(f"Error creating table: {response.text}") + return { + "success": False, + "error": f"Failed to create table: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while creating user interactions table") + return { + "success": False, + "error": str(e) + } + +def log_user_interaction(user_question, genie_space_id, ai_function_type=None, query_classification=None, + conversation_id=None, required_columns=None, response_type=None, + is_helpful=None, feedback_reason=None, user_email=None, + catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None): + """ + Logs a user interaction to the tracking table + + Args: + user_question (str): The user's question + genie_space_id (str): The genie space ID being used + ai_function_type (str, optional): The AI function type used (e.g., ai_classify, ai_forecast) + query_classification (str, optional): Query classification (Normal SQL, Predictive SQL, etc.) + conversation_id (str, optional): Conversation ID for tracking sessions + required_columns (list, optional): List of required columns + response_type (str, optional): Type of response (table, text, etc.) + is_helpful (bool, optional): Whether the user found the response helpful + feedback_reason (str, optional): Reason provided when answer wasn't helpful + user_email (str, optional): Email address of the user + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + + Returns: + dict: Result of the logging operation + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Generate unique interaction ID + interaction_id = str(uuid.uuid4()) + + # Current timestamp + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # Convert required_columns list to JSON string if provided + required_columns_json = json.dumps(required_columns) if required_columns else None + + # Escape single quotes in the question and email for SQL + escaped_question = user_question.replace("'", "''") if user_question else "" + escaped_user_email = user_email.replace("'", "''") if user_email else "" + + # Handle feedback_reason properly for SQL + if feedback_reason is None: + feedback_reason_sql = "NULL" + else: + escaped_feedback_reason = feedback_reason.replace("'", "''") + feedback_reason_sql = f"'{escaped_feedback_reason}'" + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to insert the interaction record + insert_sql = f""" + INSERT INTO {catalog_name}.{schema_name}.user_interactions + (interaction_id, timestamp, user_email, user_question, genie_space_id, ai_function_type, + query_classification, conversation_id, required_columns, workspace_url, response_type, + is_helpful, feedback_reason) + VALUES ( + '{interaction_id}', + '{current_timestamp}', + '{escaped_user_email}', + '{escaped_question}', + '{genie_space_id}', + '{ai_function_type or ""}', + '{query_classification or ""}', + '{conversation_id or ""}', + '{required_columns_json or ""}', + '{current_workspace_url}', + '{response_type or ""}', + {str(is_helpful).lower() if is_helpful is not None else 'null'}, + {feedback_reason_sql} + ) + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": insert_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info(f"User interaction logged successfully. ID: {interaction_id}") + return { + "success": True, + "message": "User interaction logged successfully", + "interaction_id": interaction_id, + "response": response.json() + } + else: + logger.error(f"Error logging user interaction: {response.text}") + return { + "success": False, + "error": f"Failed to log interaction: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while logging user interaction") + return { + "success": False, + "error": str(e) + } + +def update_user_feedback(interaction_id, is_helpful, feedback_reason=None, + catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None): + """ + Updates user feedback for an existing interaction + + Args: + interaction_id (str): The interaction ID to update feedback for + is_helpful (bool): Whether the user found the response helpful + feedback_reason (str, optional): Reason provided when answer wasn't helpful + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Result of the update operation + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Handle feedback_reason properly for SQL + if feedback_reason is None: + # Use NULL for database + feedback_reason_sql = "NULL" + else: + # Escape single quotes for SQL + escaped_feedback_reason = feedback_reason.replace("'", "''") + feedback_reason_sql = f"'{escaped_feedback_reason}'" + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to update the feedback for the interaction + update_sql = f""" + UPDATE {catalog_name}.{schema_name}.user_interactions + SET is_helpful = {str(is_helpful).lower()}, + feedback_reason = {feedback_reason_sql} + WHERE interaction_id = '{interaction_id}' + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": update_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info(f"User feedback updated successfully for interaction: {interaction_id}") + return { + "success": True, + "message": "User feedback updated successfully", + "interaction_id": interaction_id, + "response": response.json() + } + else: + logger.error(f"Error updating feedback: {response.text}") + return { + "success": False, + "error": f"Failed to update feedback: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while updating user feedback") + return { + "success": False, + "error": str(e) + } + +def log_user_feedback(interaction_id, is_helpful, feedback_reason=None, user_query=None, + genie_response_type=None, catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None): + """ + Updates user feedback for a specific interaction (wrapper function for backward compatibility) + + Args: + interaction_id (str): The interaction ID this feedback relates to + is_helpful (bool): Whether the user found the response helpful + feedback_reason (str, optional): Reason provided when answer wasn't helpful + user_query (str, optional): The original user query (ignored - kept for compatibility) + genie_response_type (str, optional): Type of response (ignored - kept for compatibility) + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + + Returns: + dict: Result of the update operation + """ + # Call the update_user_feedback function + result = update_user_feedback( + interaction_id=interaction_id, + is_helpful=is_helpful, + feedback_reason=feedback_reason, + catalog_name=catalog_name, + schema_name=schema_name, + workspace_url=workspace_url, + access_token=access_token + ) + + # Add feedback_id to response for backward compatibility + if result.get("success"): + result["feedback_id"] = interaction_id # Use interaction_id as feedback_id for compatibility + + return result + +def get_feedback_stats(catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None, days=7): + """ + Get statistics about user feedback from the tracking table + + Args: + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + days (int): Number of days to look back for statistics (default: 7) + + Returns: + dict: Statistics about user feedback + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to get feedback statistics + stats_sql = f""" + SELECT + COUNT(CASE WHEN is_helpful IS NOT NULL THEN 1 END) as total_feedback, + SUM(CASE WHEN is_helpful = true THEN 1 ELSE 0 END) as helpful_count, + SUM(CASE WHEN is_helpful = false THEN 1 ELSE 0 END) as not_helpful_count, + ROUND(AVG(CASE WHEN is_helpful = true THEN 1.0 ELSE 0.0 END) * 100, 2) as helpfulness_percentage, + response_type, + COUNT(CASE WHEN is_helpful IS NOT NULL THEN 1 END) as feedback_by_type + FROM {catalog_name}.{schema_name}.user_interactions + WHERE timestamp >= CURRENT_DATE - INTERVAL {days} DAYS + AND is_helpful IS NOT NULL + GROUP BY response_type + ORDER BY feedback_by_type DESC + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": stats_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info("Feedback statistics retrieved successfully") + return { + "success": True, + "message": "Feedback statistics retrieved successfully", + "response": response.json() + } + else: + logger.error(f"Error retrieving feedback statistics: {response.text}") + return { + "success": False, + "error": f"Failed to retrieve feedback statistics: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while retrieving feedback statistics") + return { + "success": False, + "error": str(e) + } + +def get_interaction_stats(catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None, days=7): + """ + Get statistics about user interactions from the tracking table + + Args: + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + days (int): Number of days to look back for statistics (default: 7) + + Returns: + dict: Statistics about user interactions + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to get interaction statistics + stats_sql = f""" + SELECT + COUNT(*) as total_interactions, + COUNT(DISTINCT genie_space_id) as unique_spaces, + COUNT(DISTINCT conversation_id) as unique_conversations, + ai_function_type, + query_classification, + COUNT(*) as count + FROM {catalog_name}.{schema_name}.user_interactions + WHERE timestamp >= CURRENT_DATE - INTERVAL {days} DAYS + GROUP BY ai_function_type, query_classification + ORDER BY count DESC + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": stats_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info("Interaction statistics retrieved successfully") + return { + "success": True, + "message": "Statistics retrieved successfully", + "response": response.json() + } + else: + logger.error(f"Error retrieving statistics: {response.text}") + return { + "success": False, + "error": f"Failed to retrieve statistics: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while retrieving interaction statistics") + return { + "success": False, + "error": str(e) + } + +# Example usage and testing +if __name__ == "__main__": + # Create the interaction table (now includes feedback columns) + result = create_user_interaction_table() + print("Interaction table creation result:", result) + + # Log a sample interaction + if result.get("success"): + log_result = log_user_interaction( + user_question="What is the total number of patients?", + genie_space_id="01f02f3e242515679535b61b717a4d5e", + ai_function_type="Normal SQL", + query_classification="Normal SQL", + conversation_id="test-conversation-1", + required_columns=["patient_id", "first_name", "last_name"], + response_type="table" + ) + print("Log interaction result:", log_result) + + # Update feedback for the interaction + if log_result.get("success"): + feedback_update_result = update_user_feedback( + interaction_id=log_result.get("interaction_id"), + is_helpful=True, + feedback_reason=None + ) + print("Update feedback result:", feedback_update_result) + + # Test the wrapper function as well + feedback_log_result = log_user_feedback( + interaction_id=log_result.get("interaction_id"), + is_helpful=False, + feedback_reason="The answer was not detailed enough" + ) + print("Log feedback result (wrapper):", feedback_log_result) + + # Get interaction statistics + stats_result = get_interaction_stats() + print("Interaction statistics result:", stats_result) + + # Get feedback statistics + feedback_stats_result = get_feedback_stats() + print("Feedback statistics result:", feedback_stats_result) \ No newline at end of file diff --git a/agent-genie-app/manifest.mf b/agent-genie-app/manifest.mf new file mode 100644 index 00000000..f125fa6b --- /dev/null +++ b/agent-genie-app/manifest.mf @@ -0,0 +1 @@ +{"version":"Manifest","guid":"5bb57c63-a3e4-485e-997b-bee1089d3251","origId":-1,"name":"manifest.mf"} \ No newline at end of file diff --git a/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json b/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agent_genie/.databricks/bundle/dev/vscode.bundlevars.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agent_genie/.databricks/bundle/dev/vscode.overrides.json b/agent_genie/.databricks/bundle/dev/vscode.overrides.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agent_genie/.databricks/bundle/dev/vscode.overrides.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agent_genie/CODEOWNERS b/agent_genie/CODEOWNERS new file mode 100644 index 00000000..d984c513 --- /dev/null +++ b/agent_genie/CODEOWNERS @@ -0,0 +1,22 @@ +# There can be only one CODEOWNERS file in the repo for clarity. See https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners#codeowners-file-location + +# Catch-all rule comes first + +* @databrickslabs/sandbox-write + +# Specific owners are listed last. Please keep this list in the alphabetical order + +acceptance @nfx @alexott +conversational-agent-app @vivian-xie-db @yuanchaoma-db +database-diagram-builder @alexott +downstreams @nfx @alexott +feature-registry-app @yang-chengg @mparkhe @mingyangge-db @stephanielu5 +go-libs @nfx @alexott +ip_access_list_analyzer @alexott +ka-chat-bot @taiga-db +metascan @nfx @alexott +runtime-packages @nfx @alexott +sql_migration_copilot @robertwhiffin +tacklebox @Jonathan-Choi +uc-catalog-cloning @esiol-db @vasco-lopes +.github @nfx @alexott @gueniai \ No newline at end of file diff --git a/agent_genie/app.py b/agent_genie/app.py new file mode 100644 index 00000000..10576678 --- /dev/null +++ b/agent_genie/app.py @@ -0,0 +1,156 @@ +import os +import json +import requests +from flask import Flask, render_template, request, jsonify +from databricks.sdk.core import Config +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ChatMessage, ChatMessageRole + +# Initialize Flask app +app = Flask(__name__, static_folder='static', template_folder='static') + +cfg = Config() +client = WorkspaceClient() + +# Model serving endpoint name +ENDPOINT_NAME = "agents_ws_vfc_demo-sch_vfc_demo-fashion_merchandising_agent" + +@app.route('/') +def index(): + """Serve the main HTML page""" + return render_template('index.html') + +@app.route('/api/chat', methods=['POST']) +def chat(): + """Handle chat requests and forward to model serving endpoint using Databricks SDK""" + try: + # Get the message history from the request + data = request.get_json() + messages = data.get('messages', []) + + if not messages: + return jsonify({'error': 'No messages provided'}), 400 + + # Convert messages to ChatMessage objects + chat_messages = [] + for msg in messages: + role = msg.get('role', 'user') + content = msg.get('content', '') + + # Map roles to ChatMessageRole enum + if role == 'user': + chat_role = ChatMessageRole.USER + elif role == 'assistant': + chat_role = ChatMessageRole.ASSISTANT + elif role == 'system': + chat_role = ChatMessageRole.SYSTEM + else: + chat_role = ChatMessageRole.USER # Default to user + + chat_messages.append(ChatMessage(role=chat_role, content=content)) + + # Get user information for logging + user_email = request.headers.get('X-Forwarded-Email') + app.logger.info(f"Making request to model endpoint for user: {user_email}") + + # Make request to model serving endpoint using Databricks SDK + response = client.serving_endpoints.query( + name=ENDPOINT_NAME, + messages=chat_messages + ) + + # Extract the response content + if response and hasattr(response, 'choices') and response.choices: + # Get the first choice + choice = response.choices[0] + if hasattr(choice, 'message') and choice.message: + result_content = choice.message.content + return jsonify({'content': result_content}) + else: + return jsonify({'error': 'No message content in response'}), 500 + else: + return jsonify({'error': 'No response choices received'}), 500 + + except Exception as e: + error_msg = str(e) + app.logger.error(f"Error calling model endpoint: {error_msg}") + + # Handle specific error types + if "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower(): + return jsonify({ + 'error': 'Authentication failed. Your Databricks token may have expired.', + 'details': 'Please refresh the page or log in again.' + }), 401 + elif "permission" in error_msg.lower() or "forbidden" in error_msg.lower(): + return jsonify({ + 'error': 'Access denied. You may not have permission to access this model endpoint.', + 'details': 'Please contact your Databricks administrator.' + }), 403 + elif "timeout" in error_msg.lower(): + return jsonify({'error': 'Request timed out. Please try again.'}), 504 + else: + return jsonify({ + 'error': f'Unexpected error: {error_msg}', + 'details': 'Please try again or contact support if the issue persists.' + }), 500 + +@app.route('/health') +def health(): + """Health check endpoint""" + user_email = request.headers.get('X-Forwarded-Email') + + try: + # Test the client connection + client.current_user.me() + client_healthy = True + except Exception as e: + client_healthy = False + app.logger.error(f"Client health check failed: {str(e)}") + + return jsonify({ + 'status': 'healthy', + 'client_authenticated': client_healthy, + 'user_email': user_email if user_email else 'anonymous', + 'endpoint_name': ENDPOINT_NAME + }) + +@app.route('/api/debug') +def debug(): + """Debug endpoint to check authentication and endpoint status""" + headers_info = { + 'X-Forwarded-Access-Token': 'present' if request.headers.get('X-Forwarded-Access-Token') else 'missing', + 'X-Forwarded-Email': request.headers.get('X-Forwarded-Email', 'not provided'), + 'User-Agent': request.headers.get('User-Agent', 'not provided'), + 'Authorization': 'present' if request.headers.get('Authorization') else 'missing' + } + + try: + # Check if we can access the serving endpoint + endpoint_info = client.serving_endpoints.get(name=ENDPOINT_NAME) + endpoint_status = endpoint_info.state.value if endpoint_info.state else 'unknown' + except Exception as e: + endpoint_status = f'error: {str(e)}' + + try: + # Check current user + current_user = client.current_user.me() + user_info = current_user.user_name if current_user else 'unknown' + except Exception as e: + user_info = f'error: {str(e)}' + + return jsonify({ + 'message': 'Debug information for Databricks App', + 'headers': headers_info, + 'endpoint_name': ENDPOINT_NAME, + 'endpoint_status': endpoint_status, + 'current_user': user_info, + 'sdk_config': { + 'host': cfg.host if hasattr(cfg, 'host') else 'not set', + 'auth_type': cfg.auth_type if hasattr(cfg, 'auth_type') else 'not set' + } + }) + +if __name__ == '__main__': + # Get port from environment variable or default to 8080 + port = int(os.environ.get('PORT', 8080)) + app.run(host='0.0.0.0', port=port, debug=False) \ No newline at end of file diff --git a/agent_genie/app.yaml b/agent_genie/app.yaml new file mode 100644 index 00000000..aebe8371 --- /dev/null +++ b/agent_genie/app.yaml @@ -0,0 +1,15 @@ +command: [ + "uvicorn", + "app:app", + "--host", + "127.0.0.1", + "--port", + "8000" +] +env: + - name: "SPACE_ID" + value: "01f03de6df76113387ccc36242e6c804" + - name: "SERVING_ENDPOINT_NAME" + value: "databricks-gpt-oss-120b" + - name: "TAVILY_API_KEY" + value: "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" \ No newline at end of file diff --git a/agent_genie/databricks.yml b/agent_genie/databricks.yml new file mode 100644 index 00000000..cff7fcf8 --- /dev/null +++ b/agent_genie/databricks.yml @@ -0,0 +1,95 @@ +bundle: + name: agent-genie-app + +variables: + project_name: + description: "Display name for the app" + default: "agent-genie" + + # Env-only fallback; will also be persisted to a secret by the installer job if possible + tavily_api_key: + description: "Tavily API key (installer job will try to store in a secret scope)" + default: "" + + # Where to persist the secret (customize if you already have a scope) + secret_scope: + description: "Workspace secret scope to store Tavily key" + default: "agent_genie_secrets" + + secret_key: + description: "Secret key name for Tavily key inside the scope" + default: "TAVILY_API_KEY" + + environment: + description: "Deployment environment (dev, staging, prod)" + default: "dev" + +targets: + dev: + default: true + mode: development + +resources: + apps: + agent_genie: + name: "${var.project_name}" + description: "FastAPI app with Genie + Serving Endpoint integration" + source_code_path: "." + + command: [ + "uvicorn", + "app:app", + "--host", + "127.0.0.1", + "--port", + "8000" + ] + + # Inject env for your app at runtime + env: + - name: "SPACE_ID" + value_from: "genie-space" + - name: "SERVING_ENDPOINT_NAME" + value_from: "serving-endpoint" + - name: "TAVILY_API_KEY" + value: "${var.tavily_api_key}" # app can use this directly; secret is optional hardening + + # --- App Resources (end-user picks these at install) --- + app_resources: + - key: "serving-endpoint" + serving_endpoint_spec: + permission: "CAN_QUERY" + - key: "genie-space" + genie_space_spec: + permission: "CAN_RUN" + + # --- User Authorized Scopes (Preview) --- + user_authorized_scopes: + - "sql" + - "dashboards.genie" + - "files.files" + - "serving.serving-endpoints" + - "vectorsearch.vector-search-indexes" + - "catalog.connections" + + # --- Installer job to persist Tavily key into a secret and write optional config --- + jobs: + install_app: + name: "${var.project_name} - Install/Configure" + tasks: + - task_key: configure_app + notebook_task: + notebook_path: "./notebooks/setup_app" # create this notebook + base_parameters: + TAVILY_API_KEY: "${var.tavily_api_key}" + SECRET_SCOPE: "${var.secret_scope}" + SECRET_KEY: "${var.secret_key}" + # Add compute for your workspace (example placeholders): + # existing_cluster_id: "" + # OR: + # job_clusters: + # - job_cluster_key: "install_cluster" + # new_cluster: + # spark_version: "14.3.x-scala2.12" + # node_type_id: "i3.xlarge" + # num_workers: 0 diff --git a/agent_genie/helper.py b/agent_genie/helper.py new file mode 100644 index 00000000..abcb87c1 --- /dev/null +++ b/agent_genie/helper.py @@ -0,0 +1,381 @@ +from __future__ import annotations + +# --- Optional/lazy imports for external tools --- +try: + from langchain_tavily import TavilySearch # optional +except Exception: # pragma: no cover + TavilySearch = None # type: ignore + +try: + from langchain_community.document_loaders import WebBaseLoader # optional +except Exception: # pragma: no cover + WebBaseLoader = None # type: ignore + +import os +import aiohttp +import json +import asyncio +import logging +import ssl +import time +import hashlib +import re as _re +from typing import Dict, Any, Optional, List, Union, Tuple + +import requests +import backoff +import pandas as pd + +from databricks.sdk.core import Config + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("helper") + + +# ----------------------------------------------------------------------------- +# Databricks OAuth config (single source of truth) +# ----------------------------------------------------------------------------- +_CFG = Config() # reads workspace & OAuth app from env/metadata +DATABRICKS_HOST: str = _CFG.hostname # hostname (no protocol) + +import os +from typing import List, Dict, Tuple +from langchain_community.utilities.tavily_search import TavilySearchAPIWrapper + +#os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY") # don't hardcode in code +def tavily_topk_contents(question: str, k: int = 3) -> Tuple[List[Dict], str]: + """ + Return (top-k results, combined content). If no API key or any error, returns ([], ""). + """ + if not os.getenv("TAVILY_API_KEY"): + return [], "" # no key -> no internet context + + tavily = TavilySearchAPIWrapper() + try: + print("Working with tavily, got the api key") + results = tavily.results(query=question, max_results=k) + # Normalize shapes + if isinstance(results, dict) and "results" in results: + results = results["results"] + results = (results or [])[:k] + + combined = "\n\n".join( + r.get("content", "").strip() for r in results if r.get("content") + ).strip() + return results, combined + except Exception: + # Network/quotas/shape issues -> fallback + return [], "" + + +def _get_oauth_token() -> str: + """Fetch a *fresh* OAuth token every time to avoid staleness.""" + try: + return _CFG.oauth_token().access_token + except Exception as e: # pragma: no cover + logger.exception("Failed to mint OAuth token: %s", e) + raise + + +def auth_headers() -> dict: + """Return headers using a *fresh* OAuth token (ignore any legacy tokens).""" + return { + "Authorization": f"Bearer {_get_oauth_token()}", + "Content-Type": "application/json", + } + + +# ----------------------------------------------------------------------------- +# Async Genie fetch (used by /fetch-schema etc.) +# ----------------------------------------------------------------------------- +_ALLOW_INSECURE = os.getenv("ALLOW_INSECURE_SSL", "false").lower() in ("1", "true", "yes") + + +async def _raw_fetch_answer( + workspace_url: str, + genie_room_id: str, + access_token: Optional[str], + input_text: str, + conversation_id: Optional[str] = None, +) -> dict: + """Asynchronously fetch an answer from Databricks Genie. + + Note: `access_token` is ignored on purpose; we always use fresh OAuth. + """ + if not workspace_url.endswith("/"): + workspace_url += "/" + + headers = auth_headers() + + # Optional insecure SSL (for internal envs); secure by default. + connector = None + if _ALLOW_INSECURE: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector = aiohttp.TCPConnector(ssl=ssl_context) + + max_query_retries = 3 + query_retry_count = 0 + + async with aiohttp.ClientSession(connector=connector) as session: + try: + if conversation_id: + logger.info("Continuing conversation: %s", conversation_id) + continue_url = ( + f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/conversations/{conversation_id}/messages" + ) + payload_continue = {"content": input_text} + async with session.post(continue_url, json=payload_continue, headers=headers) as resp: + if resp.status != 200: + txt = await resp.text() + logger.error("Error continuing conversation: %s - %s", resp.status, txt) + return {"error": f"API error: {resp.status} - {txt}", "conversation_id": conversation_id} + response_json = await resp.json() + if "message_id" not in response_json: + logger.error("Unexpected response (missing message_id): %s", response_json) + return {"error": "Unexpected response format from API", "conversation_id": conversation_id} + message_id = response_json["message_id"] + else: + start_conv_url = f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/start-conversation" + payload_start = {"content": input_text} + async with session.post(start_conv_url, json=payload_start, headers=headers) as resp: + if resp.status != 200: + txt = await resp.text() + logger.error("Error starting conversation: %s - %s", resp.status, txt) + return {"error": f"API error: {resp.status} - {txt}"} + response_json = await resp.json() + + if "message_id" not in response_json: + logger.error("Unexpected response (missing message_id): %s", response_json) + return {"error": "Unexpected response format from API"} + message_id = response_json["message_id"] + if "message" in response_json and "conversation_id" in response_json["message"]: + conversation_id = response_json["message"]["conversation_id"] + elif "conversation_id" in response_json: + conversation_id = response_json["conversation_id"] + else: + logger.error("Missing conversation_id in API response: %s", response_json) + return {"error": "Missing conversation_id in API response"} + + logger.info("Conversation ID: %s, Message ID: %s", conversation_id, message_id) + + while query_retry_count < max_query_retries: + if query_retry_count: + logger.info("Retry attempt %d/%d", query_retry_count + 1, max_query_retries) + poll_url = ( + f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/conversations/" + f"{conversation_id}/messages/{message_id}" + ) + max_polls = 300 # ~4-10 min depending on sleep + retry_interval = 2 + + for attempt in range(max_polls): + async with session.get(poll_url, headers=headers) as pres: + if pres.status != 200: + et = await pres.text() + logger.error("Polling error: %s - %s", pres.status, et) + await asyncio.sleep(retry_interval) + continue + poll_json = await pres.json() + + if poll_json.get("attachments"): + attachment = poll_json["attachments"][0] + attachment_id = attachment.get("attachment_id") + + # Extract SQL query from the attachment if available + sql_query = None + if "query" in attachment: + query_info = attachment.get("query", {}) + if isinstance(query_info, dict): + sql_query = query_info.get("query", "") + else: + sql_query = str(query_info) + logger.info(f"πŸ” Extracted SQL query from attachment: {sql_query[:100] if sql_query else 'None'}...") + + answer_url = ( + f"{workspace_url}api/2.0/genie/spaces/{genie_room_id}/conversations/" + f"{conversation_id}/messages/{message_id}/attachments/{attachment_id}/query-result" + ) + async with session.get(answer_url, headers=headers) as r2: + if r2.status != 200: + et = await r2.text() + logger.error("Result fetch error: %s - %s", r2.status, et) + break + result_json = await r2.json() + + state = result_json.get("statement_response", {}).get("status", {}).get("state") + if state == "SUCCEEDED": + logger.info("Query succeeded") + result_json["conversation_id"] = conversation_id + # Include the SQL query in the result + if sql_query and sql_query.strip(): + result_json["sql_query"] = sql_query.strip() + logger.info(f"βœ… Added SQL query to result: {sql_query[:100]}...") + return result_json + elif state == "FAILED": + logger.error("Query failed (attempt %d/%d)", query_retry_count + 1, max_query_retries) + if query_retry_count < max_query_retries - 1: + query_retry_count += 1 + await asyncio.sleep(2) + break + else: + return { + "error": "Query failed to execute after multiple attempts.", + "conversation_id": conversation_id, + "attempts": query_retry_count + 1, + } + # else keep polling + + if attempt % 10 == 0: + logger.info("Polling attempt %d/%d", attempt + 1, max_polls) + await asyncio.sleep(retry_interval) + + if query_retry_count < max_query_retries - 1: + query_retry_count += 1 + else: + break + + logger.warning("Failed to get result after %d attempts", query_retry_count + 1) + return { + "error": f"Could not get query result after {query_retry_count + 1} attempts.", + "conversation_id": conversation_id, + "attempts": query_retry_count + 1, + } + + except aiohttp.ClientError as e: + logger.exception("HTTP error in fetch_answer: %s", e) + return {"error": f"Network error: {str(e)}", "conversation_id": conversation_id} + except Exception as e: + logger.exception("Unexpected error in fetch_answer: %s", e) + return {"error": f"Unexpected error: {str(e)}", "conversation_id": conversation_id} + + +# -------------------- +# Lightweight TTL cache for fetch_answer +# -------------------- +_ANSWER_CACHE_TTL = int(os.getenv("ANSWER_CACHE_TTL", "300")) # seconds +_ANSWER_CACHE_MAX = int(os.getenv("ANSWER_CACHE_MAX", "512")) # entries +_ANSWER_CACHE: Dict[str, Tuple[float, dict]] = {} +_ANSWER_CACHE_LOCK = asyncio.Lock() + + +def _normalize_q(q: str) -> str: + try: + q = q.strip() + q = _re.sub(r"\s+", " ", q) + return q[:4096] + except Exception: + return q + + +def _answer_cache_key(workspace_url: str, genie_room_id: str, input_text: str, conversation_id: Optional[str]): + base = f"{workspace_url}|{genie_room_id}|{_normalize_q(input_text)}|{conversation_id or ''}" + return hashlib.sha256(base.encode("utf-8")).hexdigest() + + +async def fetch_answer(workspace_url, genie_room_id, access_token, input_text, conversation_id=None): + """Cached wrapper over _raw_fetch_answer. Set ANSWER_CACHE_TTL/ANSWER_CACHE_MAX via env.""" + key = _answer_cache_key(workspace_url, genie_room_id, input_text, conversation_id) + now = time.time() + # Try cache + try: + async with _ANSWER_CACHE_LOCK: + entry = _ANSWER_CACHE.get(key) + if entry and (now - entry[0]) < _ANSWER_CACHE_TTL: + return entry[1] + except Exception: + pass + + data = await _raw_fetch_answer(workspace_url, genie_room_id, access_token, input_text, conversation_id) + + # Avoid caching obvious auth failures + try: + if isinstance(data, dict) and "error" in data and "Invalid Token" in str(data.get("error", "")): + return data + except Exception: + pass + + # Extract SQL query from the response and add it to the data + if isinstance(data, dict) and "error" not in data: + sql_query = extract_sql_from_response(data) + if sql_query: + data["sql_query"] = sql_query + + # Store + try: + async with _ANSWER_CACHE_LOCK: + if len(_ANSWER_CACHE) >= _ANSWER_CACHE_MAX: + try: + oldest_key = next(iter(_ANSWER_CACHE)) + _ANSWER_CACHE.pop(oldest_key, None) + except Exception: + _ANSWER_CACHE.clear() + _ANSWER_CACHE[key] = (time.time(), data) + except Exception: + pass + + return data + + + +def extract_sql_from_response(response_data: dict) -> Optional[str]: + """Extract SQL query from Genie API response data.""" + try: + # Log the response structure for debugging + logger.info(f"πŸ” Debugging response structure: {list(response_data.keys()) if isinstance(response_data, dict) else 'Not a dict'}") + + # Check if response has statement_response with query + statement_response = response_data.get("statement_response", {}) + if statement_response: + logger.info(f"πŸ” Found statement_response: {list(statement_response.keys())}") + query = statement_response.get("query", "") + if query and query.strip(): + logger.info(f"βœ… Found SQL query in statement_response: {query[:100]}...") + return query.strip() + + # Check if response has attachments with query information + attachments = response_data.get("attachments", []) + if attachments: + logger.info(f"πŸ” Found attachments: {len(attachments)} items") + for i, attachment in enumerate(attachments): + logger.info(f"πŸ” Attachment {i}: {list(attachment.keys()) if isinstance(attachment, dict) else 'Not a dict'}") + if "query" in attachment: + query_info = attachment.get("query", {}) + logger.info(f"πŸ” Found query in attachment: {list(query_info.keys()) if isinstance(query_info, dict) else query_info}") + if isinstance(query_info, dict): + query_text = query_info.get("query", "") + else: + query_text = str(query_info) + if query_text and query_text.strip(): + logger.info(f"βœ… Found SQL query in attachments: {query_text[:100]}...") + return query_text.strip() + + # Check for SQL in result_data if available + result_data = response_data.get("result_data", {}) + if result_data: + logger.info(f"πŸ” Found result_data: {list(result_data.keys())}") + if "query" in result_data: + query_text = result_data.get("query", "") + if query_text and query_text.strip(): + logger.info(f"βœ… Found SQL query in result_data: {query_text[:100]}...") + return query_text.strip() + + # Additional check for direct query field + if "query" in response_data: + query_text = response_data.get("query", "") + if query_text and query_text.strip(): + logger.info(f"βœ… Found SQL query in root: {query_text[:100]}...") + return query_text.strip() + + logger.warning("⚠️ No SQL query found in response") + return None + except Exception as e: + logger.warning(f"Error extracting SQL from response: {str(e)}") + return None diff --git a/agent_genie/manual_ai_content.py b/agent_genie/manual_ai_content.py new file mode 100644 index 00000000..324cf333 --- /dev/null +++ b/agent_genie/manual_ai_content.py @@ -0,0 +1,227 @@ +# Manual content storage for each AI function type +MANUAL_AI_CONTENT = { + "ai_analyze_sentiment": """Syntax + SQL + ai_analyze_sentiment(content) + + Arguments + content: A STRING expression, the text to be analyzed. + Returns + A STRING. The value is chosen from 'positive', 'negative', 'neutral', or 'mixed'. Returns null if the sentiment cannot be detected. + + Examples + SQL + > SELECT ai_analyze_sentiment('I am happy'); + positive + + > SELECT ai_analyze_sentiment('I am sad'); + negative""", + + "ai_classify": """Syntax + ai_classify(content, labels) + + Arguments + content: A STRING expression, the text to be classified. + labels: An ARRAY literal, the expected output classification labels. Must contain at least 2 elements, and no more than 20 elements. + Returns + A STRING. The value matches one of the strings provided in the labels argument. Returns null if the content cannot be classified. + + Examples + SQL + > SELECT ai_classify("My password is leaked.", ARRAY("urgent", "not urgent")); + urgent + + > SELECT + description, + ai_classify(description, ARRAY('clothing', 'shoes', 'accessories', 'furniture')) AS category + FROM + products + ;""", + + "ai_extract": """Syntax + ai_extract(content, labels) + + Arguments + content: A STRING expression. + labels: An ARRAY literal. Each element is a type of entity to be extracted. + Returns + A STRUCT where each field corresponds to an entity type specified in labels. Each field contains a string representing the extracted entity. If more than one candidate for any entity type is found, only one is returned. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_extract( + 'John Doe lives in New York and works for Acme Corp.', + array('person', 'location', 'organization') + ); + {"person": "John Doe", "location": "New York", "organization": "Acme Corp."} + + > SELECT ai_extract( + 'Send an email to jane.doe@example.com about the meeting at 10am.', + array('email', 'time') + ); + {"email": "jane.doe@example.com", "time": "10am"}""", + + "ai_fix_grammar": """ + Syntax + SQL + ai_fix_grammar(content) + + Arguments + content: A STRING expression. + Returns + A STRING with corrected grammar. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_fix_grammar('This sentence have some mistake'); + "This sentence has some mistakes" + + > SELECT ai_fix_grammar('She dont know what to did.'); + "She doesn't know what to do." + + > SELECT ai_fix_grammar('He go to school every days.'); + "He goes to school every day." + """, + + "ai_gen": """Syntax + ai_gen(prompt) + + Arguments + prompt: A STRING expression. + Returns + A STRING. + + Examples + SQL + > SELECT ai_gen('Generate a concise, cheerful email title for a summer bike sale with 20% discount'); + Summer Bike Sale: Grab Your Dream Bike at 20% Off! + + > SELECT + question, + ai_gen( + 'You are a teacher. Answer the students question in 50 words: ' || question + ) AS answer + FROM + questions + ; + + """, + "ai_mask": """Syntax + ai_mask(content, labels) + + Arguments + content: A STRING expression. + labels: An ARRAY literal. Each element represents a type of information to be masked. + Returns + A STRING where the specified information is masked. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_mask( + 'John Doe lives in New York. His email is john.doe@example.com.', + array('person', 'email') + ); + "[MASKED] lives in New York. His email is [MASKED]." + + > SELECT ai_mask( + 'Contact me at 555-1234 or visit us at 123 Main St.', + array('phone', 'address') + ); + "Contact me at [MASKED] or visit at [MASKED]""", + # "ai_parse_document": "", + "ai_similarity": """Syntax + ai_similarity(expr1, expr2) + + Arguments + expr1: A STRING expression. + expr2: A STRING expression. + Returns + A FLOAT value, representing the semantic similarity between the two input strings. The output score is relative and should only be used for ranking. Score of 1 means the two text are equal. + + Examples + SQL + > SELECT ai_similarity('Apache Spark', 'Apache Spark'); + 1.0 + + > SELECT + company_name + FROM + customers + ORDER BY ai_similarity(company_name, 'Databricks') DESC + ;""", + + "ai_summarize": """Syntax + ai_summarize(content[, max_words]) + + Arguments + content: A STRING expression, the text to be summarized. + max_words: An optional non-negative integral numeric expression representing the best-effort target number of words in the returned summary text. The default value is 50. If set to 0, there is no word limit. + Returns + A STRING. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_summarize( + 'Apache Spark is a unified analytics engine for large-scale data processing. ' || + 'It provides high-level APIs in Java, Scala, Python and R, and an optimized ' || + 'engine that supports general execution graphs. It also supports a rich set ' || + 'of higher-level tools including Spark SQL for SQL and structured data ' || + 'processing, pandas API on Spark for pandas workloads, MLlib for machine ' || + 'learning, GraphX for graph processing, and Structured Streaming for incremental ' || + 'computation and stream processing.', + 20 + ); + "Apache Spark is a unified, multi-language analytics engine for large-scale data processing + with additional tools for SQL, machine learning, graph processing, and stream computing.""", + "ai_translate": """Syntax + SQL + ai_translate(content, to_lang) + + Arguments + content: A STRING expression, the text to be translated. + to_lang: A STRING expression, the target language code to translate the content to. + Returns + A STRING. + + If content is NULL, the result is NULL. + + Examples + SQL + > SELECT ai_translate('Hello, how are you?', 'es'); + "Hola, ΒΏcΓ³mo estΓ‘s?" + + > SELECT ai_translate('La vida es un hermoso viaje.', 'en'); + "Life is a beautiful journey.""", + "ai_forecast": """Syntax + SQL + + ai_forecast( + observed TABLE, + horizon DATE | TIMESTAMP | STRING, + time_col STRING, + value_col STRING | ARRAY, + group_col STRING | ARRAY | NULL DEFAULT NULL, + prediction_interval_width DOUBLE DEFAULT 0.95, + frequency STRING DEFAULT 'auto', + seed INTEGER | NULL DEFAULT NULL, + parameters STRING DEFAULT '{}' + ) + + ... (truncated for brevity in this message; keep your full content here) + """, + "ai_query": """Syntax + To query an endpoint that serves a foundation model: + + ai_query(endpoint, request) + + ... (truncated for brevity in this message; keep your full content here) + """ # For general predictive queries +} \ No newline at end of file diff --git a/agent_genie/requirements.txt b/agent_genie/requirements.txt new file mode 100644 index 00000000..77828544 --- /dev/null +++ b/agent_genie/requirements.txt @@ -0,0 +1,20 @@ +mlflow +langgraph==0.3.4 +databricks-langchain +databricks-agents +uv +fastapi +uvicorn +openai +databricks-sdk>=0.18.0 +# databricks-vectorsearch +jinja2 +langchain-tavily +langchain_community +beautifulsoup4 +pandas +python-dotenv +openai +backoff +pypdf +cryptography>=3.1 \ No newline at end of file diff --git a/agent_genie/table_extraction.py b/agent_genie/table_extraction.py new file mode 100644 index 00000000..28ca9c6e --- /dev/null +++ b/agent_genie/table_extraction.py @@ -0,0 +1,137 @@ +import requests +from dotenv import load_dotenv +import os +import json + +# Load environment variables +load_dotenv() + +# Get Databricks workspace URL and access token as fallbacks +fallback_workspace_url = os.getenv("WORKSPACE_URL") +fallback_access_token = os.getenv("ACCESS_TOKEN") + +def get_tables(catalog_name, schema_name, workspace_url=None, access_token=None): + """ + Get tables for a given catalog and schema from Databricks SQL + + Args: + catalog_name (str): The name of the catalog + schema_name (str): The name of the schema + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Dictionary with tables information or error + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + # Endpoint for listing tables - using Unity Catalog endpoint + endpoint = f"{current_workspace_url}/api/2.1/unity-catalog/tables" + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Parameters for the request + params = { + "catalog_name": catalog_name, + "schema_name": schema_name, + "max_results": 20, # optional + "omit_columns": True, # optional + "omit_properties": True # optional + } + + # Make the request + response = requests.get(endpoint, headers=headers, params=params) + + # Check if request was successful + if response.status_code == 200: + tables = response.json().get("tables", []) + return { + "success": True, + "tables": tables + } + else: + return { + "success": False, + "error": f"Failed to get tables: {response.text}" + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + +def get_table_columns(catalog_name, schema_name, table_name, workspace_url=None, access_token=None): + """ + Get columns for a specific table from Databricks SQL + + Args: + catalog_name (str): The name of the catalog + schema_name (str): The name of the schema + table_name (str): The name of the table + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Dictionary with column information or error + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + # Endpoint for describing table using Unity Catalog + endpoint = f"{current_workspace_url}/api/2.1/unity-catalog/tables/{catalog_name}.{schema_name}.{table_name}" + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Make the request + response = requests.get(endpoint, headers=headers) + + # Check if request was successful + if response.status_code == 200: + # Extract column information from Unity Catalog response + columns_data = response.json().get("columns", []) + # Extract column names + column_names = [col.get("name") for col in columns_data if col.get("name")] + return { + "success": True, + "columns": column_names + } + else: + return { + "success": False, + "error": f"Failed to get columns: {response.text}" + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + +# Example usage +if __name__ == "__main__": + # Default values + default_catalog = "users_trial" + default_schema = "nitin_aggarwal" + get_tables(default_catalog, default_schema) diff --git a/agent_genie/templates/index.html b/agent_genie/templates/index.html new file mode 100644 index 00000000..e2d7ceaf --- /dev/null +++ b/agent_genie/templates/index.html @@ -0,0 +1,2296 @@ + + + + + + Databricks Genie++ Omni-Analytics Platform + + + + + +
+
+

Databricks Genie++ Omni-Analytics Platform

+
+ +
+ A multi-agent AI engine that answers questions, forecasts trends, classifies data, summarizes records, translates languages, and recommends next-best actions. For any questions or feedback, please reach out to nitin.aggarwal@databricks.com +
+ + + + + +
+
+ Advanced Questions + + + +
+
+
+
+ + + + + +
+
+
+
+ + +
+
+ Suggested Questions + + + +
+
+
+
+ Loading questions based on your data schema... +
+
+
+
+ + +
+
+ πŸ“„ Document Analyzer + + + +
+
+ +
+ + + +
+ + +
+ + +
+
+
+ +
+
+ + +
+
+ +
+
+
+ + + + \ No newline at end of file diff --git a/agent_genie/tracking.py b/agent_genie/tracking.py new file mode 100644 index 00000000..20a403f5 --- /dev/null +++ b/agent_genie/tracking.py @@ -0,0 +1,584 @@ +import requests +import json +import uuid +import logging +from datetime import datetime +from dotenv import load_dotenv +import os + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Get Databricks workspace URL and access token as fallbacks +fallback_workspace_url = os.getenv("WORKSPACE_URL") +fallback_access_token = os.getenv("ACCESS_TOKEN") + +def create_user_interaction_table(catalog_name="user", schema_name="nitin_aggarwal", workspace_url=None, access_token=None): + """ + Creates a table to track user interactions with the genie space + + Args: + catalog_name (str): The name of the catalog (default: "users") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Result of table creation operation + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to create the user interaction tracking table + create_table_sql = f""" + CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.user_interactions ( + interaction_id STRING, + timestamp TIMESTAMP, + user_email STRING, + user_question STRING, + genie_space_id STRING, + ai_function_type STRING, + query_classification STRING, + conversation_id STRING, + required_columns STRING, + workspace_url STRING, + response_type STRING, + is_helpful BOOLEAN, + feedback_reason STRING + ) + USING DELTA + COMMENT 'Tracks user interactions with the Databricks Genie space including feedback' + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": create_table_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info(f"User interactions table created successfully in {catalog_name}.{schema_name}") + return { + "success": True, + "message": f"Table {catalog_name}.{schema_name}.user_interactions created successfully", + "response": response.json() + } + else: + logger.error(f"Error creating table: {response.text}") + return { + "success": False, + "error": f"Failed to create table: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while creating user interactions table") + return { + "success": False, + "error": str(e) + } + +def log_user_interaction(user_question, genie_space_id, ai_function_type=None, query_classification=None, + conversation_id=None, required_columns=None, response_type=None, + is_helpful=None, feedback_reason=None, user_email=None, + catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None): + """ + Logs a user interaction to the tracking table + + Args: + user_question (str): The user's question + genie_space_id (str): The genie space ID being used + ai_function_type (str, optional): The AI function type used (e.g., ai_classify, ai_forecast) + query_classification (str, optional): Query classification (Normal SQL, Predictive SQL, etc.) + conversation_id (str, optional): Conversation ID for tracking sessions + required_columns (list, optional): List of required columns + response_type (str, optional): Type of response (table, text, etc.) + is_helpful (bool, optional): Whether the user found the response helpful + feedback_reason (str, optional): Reason provided when answer wasn't helpful + user_email (str, optional): Email address of the user + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + + Returns: + dict: Result of the logging operation + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Generate unique interaction ID + interaction_id = str(uuid.uuid4()) + + # Current timestamp + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # Convert required_columns list to JSON string if provided + required_columns_json = json.dumps(required_columns) if required_columns else None + + # Escape single quotes in the question and email for SQL + escaped_question = user_question.replace("'", "''") if user_question else "" + escaped_user_email = user_email.replace("'", "''") if user_email else "" + + # Handle feedback_reason properly for SQL + if feedback_reason is None: + feedback_reason_sql = "NULL" + else: + escaped_feedback_reason = feedback_reason.replace("'", "''") + feedback_reason_sql = f"'{escaped_feedback_reason}'" + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to insert the interaction record + insert_sql = f""" + INSERT INTO {catalog_name}.{schema_name}.user_interactions + (interaction_id, timestamp, user_email, user_question, genie_space_id, ai_function_type, + query_classification, conversation_id, required_columns, workspace_url, response_type, + is_helpful, feedback_reason) + VALUES ( + '{interaction_id}', + '{current_timestamp}', + '{escaped_user_email}', + '{escaped_question}', + '{genie_space_id}', + '{ai_function_type or ""}', + '{query_classification or ""}', + '{conversation_id or ""}', + '{required_columns_json or ""}', + '{current_workspace_url}', + '{response_type or ""}', + {str(is_helpful).lower() if is_helpful is not None else 'null'}, + {feedback_reason_sql} + ) + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": insert_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info(f"User interaction logged successfully. ID: {interaction_id}") + return { + "success": True, + "message": "User interaction logged successfully", + "interaction_id": interaction_id, + "response": response.json() + } + else: + logger.error(f"Error logging user interaction: {response.text}") + return { + "success": False, + "error": f"Failed to log interaction: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while logging user interaction") + return { + "success": False, + "error": str(e) + } + +def update_user_feedback(interaction_id, is_helpful, feedback_reason=None, + catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None): + """ + Updates user feedback for an existing interaction + + Args: + interaction_id (str): The interaction ID to update feedback for + is_helpful (bool): Whether the user found the response helpful + feedback_reason (str, optional): Reason provided when answer wasn't helpful + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL. If not provided, uses fallback from environment. + access_token (str, optional): Databricks access token. If not provided, uses fallback from environment. + + Returns: + dict: Result of the update operation + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Handle feedback_reason properly for SQL + if feedback_reason is None: + # Use NULL for database + feedback_reason_sql = "NULL" + else: + # Escape single quotes for SQL + escaped_feedback_reason = feedback_reason.replace("'", "''") + feedback_reason_sql = f"'{escaped_feedback_reason}'" + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to update the feedback for the interaction + update_sql = f""" + UPDATE {catalog_name}.{schema_name}.user_interactions + SET is_helpful = {str(is_helpful).lower()}, + feedback_reason = {feedback_reason_sql} + WHERE interaction_id = '{interaction_id}' + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": update_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info(f"User feedback updated successfully for interaction: {interaction_id}") + return { + "success": True, + "message": "User feedback updated successfully", + "interaction_id": interaction_id, + "response": response.json() + } + else: + logger.error(f"Error updating feedback: {response.text}") + return { + "success": False, + "error": f"Failed to update feedback: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while updating user feedback") + return { + "success": False, + "error": str(e) + } + +def log_user_feedback(interaction_id, is_helpful, feedback_reason=None, user_query=None, + genie_response_type=None, catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None): + """ + Updates user feedback for a specific interaction (wrapper function for backward compatibility) + + Args: + interaction_id (str): The interaction ID this feedback relates to + is_helpful (bool): Whether the user found the response helpful + feedback_reason (str, optional): Reason provided when answer wasn't helpful + user_query (str, optional): The original user query (ignored - kept for compatibility) + genie_response_type (str, optional): Type of response (ignored - kept for compatibility) + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + + Returns: + dict: Result of the update operation + """ + # Call the update_user_feedback function + result = update_user_feedback( + interaction_id=interaction_id, + is_helpful=is_helpful, + feedback_reason=feedback_reason, + catalog_name=catalog_name, + schema_name=schema_name, + workspace_url=workspace_url, + access_token=access_token + ) + + # Add feedback_id to response for backward compatibility + if result.get("success"): + result["feedback_id"] = interaction_id # Use interaction_id as feedback_id for compatibility + + return result + +def get_feedback_stats(catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None, days=7): + """ + Get statistics about user feedback from the tracking table + + Args: + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + days (int): Number of days to look back for statistics (default: 7) + + Returns: + dict: Statistics about user feedback + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to get feedback statistics + stats_sql = f""" + SELECT + COUNT(CASE WHEN is_helpful IS NOT NULL THEN 1 END) as total_feedback, + SUM(CASE WHEN is_helpful = true THEN 1 ELSE 0 END) as helpful_count, + SUM(CASE WHEN is_helpful = false THEN 1 ELSE 0 END) as not_helpful_count, + ROUND(AVG(CASE WHEN is_helpful = true THEN 1.0 ELSE 0.0 END) * 100, 2) as helpfulness_percentage, + response_type, + COUNT(CASE WHEN is_helpful IS NOT NULL THEN 1 END) as feedback_by_type + FROM {catalog_name}.{schema_name}.user_interactions + WHERE timestamp >= CURRENT_DATE - INTERVAL {days} DAYS + AND is_helpful IS NOT NULL + GROUP BY response_type + ORDER BY feedback_by_type DESC + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": stats_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info("Feedback statistics retrieved successfully") + return { + "success": True, + "message": "Feedback statistics retrieved successfully", + "response": response.json() + } + else: + logger.error(f"Error retrieving feedback statistics: {response.text}") + return { + "success": False, + "error": f"Failed to retrieve feedback statistics: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while retrieving feedback statistics") + return { + "success": False, + "error": str(e) + } + +def get_interaction_stats(catalog_name="users_trial", schema_name="nitin_aggarwal", + workspace_url=None, access_token=None, days=7): + """ + Get statistics about user interactions from the tracking table + + Args: + catalog_name (str): The name of the catalog (default: "users_trial") + schema_name (str): The name of the schema (default: "nitin_aggarwal") + workspace_url (str, optional): Databricks workspace URL + access_token (str, optional): Databricks access token + days (int): Number of days to look back for statistics (default: 7) + + Returns: + dict: Statistics about user interactions + """ + try: + # Use provided values or fallback to environment variables + current_workspace_url = workspace_url if workspace_url else fallback_workspace_url + current_access_token = access_token if access_token else fallback_access_token + + if not current_workspace_url or not current_access_token: + return { + "success": False, + "error": "Workspace URL and Access Token are required" + } + + # Ensure workspace URL ends with a slash + if not current_workspace_url.endswith('/'): + current_workspace_url = current_workspace_url + '/' + + # Endpoint for SQL statements + endpoint = f"{current_workspace_url}api/2.0/sql/statements/" + + # SQL to get interaction statistics + stats_sql = f""" + SELECT + COUNT(*) as total_interactions, + COUNT(DISTINCT genie_space_id) as unique_spaces, + COUNT(DISTINCT conversation_id) as unique_conversations, + ai_function_type, + query_classification, + COUNT(*) as count + FROM {catalog_name}.{schema_name}.user_interactions + WHERE timestamp >= CURRENT_DATE - INTERVAL {days} DAYS + GROUP BY ai_function_type, query_classification + ORDER BY count DESC + """ + + # Payload for the SQL statement + payload = { + "warehouse_id": "63bf73769cfb22e3", # Using the same warehouse ID from the example + "catalog": catalog_name, + "schema": schema_name, + "statement": stats_sql + } + + # Headers for authentication + headers = { + "Authorization": f"Bearer {current_access_token}", + "Content-Type": "application/json" + } + + # Execute the SQL statement + response = requests.post(endpoint, headers=headers, json=payload) + + if response.status_code == 200: + logger.info("Interaction statistics retrieved successfully") + return { + "success": True, + "message": "Statistics retrieved successfully", + "response": response.json() + } + else: + logger.error(f"Error retrieving statistics: {response.text}") + return { + "success": False, + "error": f"Failed to retrieve statistics: {response.text}" + } + + except Exception as e: + logger.exception("Exception occurred while retrieving interaction statistics") + return { + "success": False, + "error": str(e) + } + +# Example usage and testing +if __name__ == "__main__": + # Create the interaction table (now includes feedback columns) + result = create_user_interaction_table() + print("Interaction table creation result:", result) + + # Log a sample interaction + if result.get("success"): + log_result = log_user_interaction( + user_question="What is the total number of patients?", + genie_space_id="01f02f3e242515679535b61b717a4d5e", + ai_function_type="Normal SQL", + query_classification="Normal SQL", + conversation_id="test-conversation-1", + required_columns=["patient_id", "first_name", "last_name"], + response_type="table" + ) + print("Log interaction result:", log_result) + + # Update feedback for the interaction + if log_result.get("success"): + feedback_update_result = update_user_feedback( + interaction_id=log_result.get("interaction_id"), + is_helpful=True, + feedback_reason=None + ) + print("Update feedback result:", feedback_update_result) + + # Test the wrapper function as well + feedback_log_result = log_user_feedback( + interaction_id=log_result.get("interaction_id"), + is_helpful=False, + feedback_reason="The answer was not detailed enough" + ) + print("Log feedback result (wrapper):", feedback_log_result) + + # Get interaction statistics + stats_result = get_interaction_stats() + print("Interaction statistics result:", stats_result) + + # Get feedback statistics + feedback_stats_result = get_feedback_stats() + print("Feedback statistics result:", feedback_stats_result) \ No newline at end of file diff --git a/manifest.mf b/manifest.mf new file mode 100644 index 00000000..f125fa6b --- /dev/null +++ b/manifest.mf @@ -0,0 +1 @@ +{"version":"Manifest","guid":"5bb57c63-a3e4-485e-997b-bee1089d3251","origId":-1,"name":"manifest.mf"} \ No newline at end of file From f9c359d17bb51a7bceb1924aa0712314b1f6b834 Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Tue, 16 Sep 2025 17:20:42 -0400 Subject: [PATCH 02/12] Create manifest.yaml --- agent_genie/manifest.yaml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 agent_genie/manifest.yaml diff --git a/agent_genie/manifest.yaml b/agent_genie/manifest.yaml new file mode 100644 index 00000000..1977ef8f --- /dev/null +++ b/agent_genie/manifest.yaml @@ -0,0 +1,26 @@ +version: 1 +name: "agent-genie" +description: "A sample Databricks application demonstrating various resource types and permissions" +resource_specs: + - name: "serving-endpoint" + description: "Machine learning model serving endpoint" + serving_endpoint_spec: + permission: "CAN_QUERY" + - name: "genie-space" + description: "Databricks Genie Space for multi-agent apps" + genie_space_spec: + permission: "CAN_EDIT" # use CAN_USE if your workspace doesn't support CAN_EDIT + - name: "tavily-api-key" + description: "Tavily API key for external knowledge search" + secret_spec: + permission: "READ" + - name: "data-volume" + description: "Unity Catalog volume for data storage" + uc_securable_spec: + securable_type: "VOLUME" + permission: "READ_VOLUME" + +user_api_scopes: + - "sql" + - "serving.serving-endpoints" + - "dashboards.genie" From 72cced71749d74d9268c72794490d322188efd2a Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Thu, 18 Sep 2025 23:40:49 -0400 Subject: [PATCH 03/12] Created README.md --- agent_genie/README.md | 155 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 agent_genie/README.md diff --git a/agent_genie/README.md b/agent_genie/README.md new file mode 100644 index 00000000..6a145691 --- /dev/null +++ b/agent_genie/README.md @@ -0,0 +1,155 @@ +# Conversational Agent App + +**Language:** Python +**Author:** Nitin Aggarwal +**Date:** 2025-09-19 + +## Conversational Agent App + +This repository demonstrates how to integrate Databricks' AI/BI Genie Conversation APIs into custom Databricks Apps applications, allowing users to interact with their structured data using natural language. + +You can also click the Generate insights button and generate deep analysis and trends of your data. + +--- + +## Overview +This app is a Python-based application (using FastAPI) featuring a chat interface powered by Databricks Genie Conversation APIs, built specifically to run as a custom Databricks App. This integration showcases how to leverage Databricks' platform capabilities to create interactive data applications with minimal infrastructure overhead. + +The Databricks Genie Conversation APIs enable you to embed AI/BI Genie capabilities into any application, allowing users to: +- Ask questions about their data in natural language +- Get SQL-powered insights without writing code +- Follow up with contextual questions in a conversation thread + +--- + +## Key Features +- **Powered by Databricks Apps**: Deploy and run directly from your Databricks workspace with built-in security and scaling +- **Zero Infrastructure Management**: Leverage Databricks Apps to handle hosting, scaling, and security +- **Workspace Integration**: Access your data assets and models directly from your Databricks workspace +- **Natural Language Data Queries**: Ask questions about your data in plain English +- **Stateful Conversations**: Maintain context for follow-up questions +- **OBO (On-Behalf-Of) Authentication**: User credentials passthrough for fine-grained access control + +--- + +## Example Use Case +This app shows how to create a simple interface that connects to the Genie API, allowing users to: +1. Start a conversation with a question about their supply chain data +2. View generated SQL and results +3. Ask follow-up questions that maintain context + +--- + +## Installation & Usage + +### Prerequisites +- Python 3.9+ +- Databricks workspace access with Genie APIs enabled +- `pip` package manager + +### Setup Locally +1. Clone the repository: + ```bash + git clone + cd conversational-agent-app/agent-genie/agent_genie + ``` + +2. Create and activate a virtual environment: + ```bash + python3 -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. Install dependencies: + ```bash + pip install -r requirements.txt + ``` + +4. Run the app locally: + ```bash + python app.py + ``` + +5. Open your browser at [http://localhost:8000](http://localhost:8000) to interact with the chat interface. + +### Running in Databricks +Refer to the [Deploying to Databricks Apps](#deploying-to-databricks-apps) section below. + +--- + +## Project Structure +``` +agent-genie/ +β”‚ +β”œβ”€β”€ agent_genie/ +β”‚ β”œβ”€β”€ app.py # Main entry point for the Dash/Flask app +β”‚ β”œβ”€β”€ helper.py # Utility functions +β”‚ β”œβ”€β”€ table_extraction.py # Table data parsing and extraction logic +β”‚ β”œβ”€β”€ tracking.py # Event logging and tracking +β”‚ β”œβ”€β”€ manual_ai_content.py # Predefined AI responses/content +β”‚ β”œβ”€β”€ requirements.txt # Python dependencies +β”‚ β”œβ”€β”€ templates/ +β”‚ β”‚ └── index.html # Frontend UI template +β”‚ β”œβ”€β”€ app.yaml # App configuration +β”‚ β”œβ”€β”€ databricks.yml # Databricks-specific config +β”‚ └── ... +β”‚ +β”œβ”€β”€ manifest.yaml # High-level manifest +└── manifest.mf # App metadata +``` + +--- + +## Deploying to Databricks Apps +The app can be installed through Databricks Marketplace. If you prefer to clone and deploy it manually, please refer to these instructions: + +1. Clone the repository to your Databricks workspace using Git Folder: + - Navigate to the Workspace section in the sidebar. + - Click on the 'Create' button, select the 'Git Folder' option and follow the prompts to clone the repository. + +2. Create an app with a serving endpoint resource: + - Navigate to the Compute section in the sidebar. + - Go to the Apps tab and click the **Create app** button. Fill in the necessary fields and click **Next: Configuration**. + - To reuse an existing app, click the link to your app in the Name column to go to the detail page, then click **Edit**. + - In the **App resources** section, click **+ Add resource** and select **Serving endpoint**. Choose a chat endpoint, grant `CAN_QUERY` permission and name it `serving_endpoint`. + - Select Genie Space, grant `CAN_RUN` permission and name it `genie_space`. + +3. Deploy the app using the Databricks Apps interface: + - Go to the detail page of the app. + - Click **Deploy** and select the folder `conversational-agent-app` from the created Git folder. + - Click **Select**, then **Deploy**. + - Review the configuration and click **Deploy**. + +For more details, refer to the official Databricks documentation. + +--- + +## Troubleshooting +- After installing the app from Marketplace, check the **Authorization page** for API scope details. +- When you open the URL link for the first time, ensure you see the OBO scope authorization page with all four scopes: + - `serving.serving-endpoints` + - `dashboards.genie` + - `files.files` + - `sql` + +If you cloned the Git repo directly, note that `serving.serving-endpoints` may not yet appear in the scope list in the UI. You will need to use the Databricks API or CLI to manually grant this scope. + +Example CLI update: +```bash +databricks account custom-app-integration update '65d90ec2-54ba-4fcb-a85d-eac774235aea' --json '{"scopes": ["openid", "profile", "email", "all-apis", "offline_access", "serving.serving-endpoints"]}' +``` + +Other common fixes: +- Clear browser cookies or try incognito mode if scopes don’t refresh +- Ensure users have proper access to underlying resources: + - **Tables**: `USE CATALOG`, `USE SCHEMA`, `SELECT` permissions + - **Genie Space**: `CAN_RUN` permission + - **SQL Warehouse**: `CAN_USE` permission + - **Model Serving Endpoint**: `CAN_QUERY` permission + +--- + +## Resources +- [Databricks Genie Documentation](https://docs.databricks.com/) +- [Conversation APIs Documentation](https://docs.databricks.com/) +- [Databricks Apps Documentation](https://docs.databricks.com/) From 0e5d157b6a653cc05dd61e9619ac156d07689dce Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Fri, 19 Sep 2025 00:10:37 -0400 Subject: [PATCH 04/12] removed volumes spec and change Tavily key to optional and textbox type manifest.yaml --- agent_genie/manifest.yaml | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/agent_genie/manifest.yaml b/agent_genie/manifest.yaml index 1977ef8f..3e4eebc7 100644 --- a/agent_genie/manifest.yaml +++ b/agent_genie/manifest.yaml @@ -1,24 +1,23 @@ version: 1 -name: "agent-genie" +name: "agent-genie++" description: "A sample Databricks application demonstrating various resource types and permissions" + resource_specs: - name: "serving-endpoint" description: "Machine learning model serving endpoint" serving_endpoint_spec: permission: "CAN_QUERY" + - name: "genie-space" description: "Databricks Genie Space for multi-agent apps" genie_space_spec: permission: "CAN_EDIT" # use CAN_USE if your workspace doesn't support CAN_EDIT + - name: "tavily-api-key" - description: "Tavily API key for external knowledge search" - secret_spec: - permission: "READ" - - name: "data-volume" - description: "Unity Catalog volume for data storage" - uc_securable_spec: - securable_type: "VOLUME" - permission: "READ_VOLUME" + description: "(Optional) For external search over internet, you may use Tavily API Key (https://docs.tavily.com/documentation/quickstart)" + input_spec: + type: "text" + required: false user_api_scopes: - "sql" From e15e4d2a2728485c1f8838f9b5caa25893732888 Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Fri, 19 Sep 2025 00:18:45 -0400 Subject: [PATCH 05/12] added git clone --- agent_genie/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/agent_genie/README.md b/agent_genie/README.md index 6a145691..30fc8dca 100644 --- a/agent_genie/README.md +++ b/agent_genie/README.md @@ -50,8 +50,7 @@ This app shows how to create a simple interface that connects to the Genie API, ### Setup Locally 1. Clone the repository: ```bash - git clone - cd conversational-agent-app/agent-genie/agent_genie + git clone [](https://github.com/nitinaggarwal-databricks/sandbox/tree/0e5d157b6a653cc05dd61e9619ac156d07689dce/agent_genie) ``` 2. Create and activate a virtual environment: From dba06ba1f50999b4005ec508a85d790d168eb6df Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Fri, 19 Sep 2025 00:31:32 -0400 Subject: [PATCH 06/12] removed ++ --- agent_genie/manifest.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent_genie/manifest.yaml b/agent_genie/manifest.yaml index 3e4eebc7..609b1a84 100644 --- a/agent_genie/manifest.yaml +++ b/agent_genie/manifest.yaml @@ -1,5 +1,5 @@ version: 1 -name: "agent-genie++" +name: "agent-genie" description: "A sample Databricks application demonstrating various resource types and permissions" resource_specs: From 6425f64ffdea8a3857a935e5909f4d367701e266 Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Fri, 19 Sep 2025 00:37:06 -0400 Subject: [PATCH 07/12] commented tavily --- agent_genie/manifest.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/agent_genie/manifest.yaml b/agent_genie/manifest.yaml index 609b1a84..136f9a7e 100644 --- a/agent_genie/manifest.yaml +++ b/agent_genie/manifest.yaml @@ -13,11 +13,11 @@ resource_specs: genie_space_spec: permission: "CAN_EDIT" # use CAN_USE if your workspace doesn't support CAN_EDIT - - name: "tavily-api-key" - description: "(Optional) For external search over internet, you may use Tavily API Key (https://docs.tavily.com/documentation/quickstart)" - input_spec: - type: "text" - required: false + # - name: "tavily-api-key" + # description: "(Optional) For external search over internet, you may use Tavily API Key (https://docs.tavily.com/documentation/quickstart)" + # input_spec: + # type: "text" + # required: false user_api_scopes: - "sql" From 3deb177943169ced18099e313f2242b612a2da96 Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Tue, 30 Sep 2025 13:13:30 -0400 Subject: [PATCH 08/12] Update manifest.yaml --- agent_genie/manifest.yaml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/agent_genie/manifest.yaml b/agent_genie/manifest.yaml index 136f9a7e..d7b5a475 100644 --- a/agent_genie/manifest.yaml +++ b/agent_genie/manifest.yaml @@ -13,12 +13,6 @@ resource_specs: genie_space_spec: permission: "CAN_EDIT" # use CAN_USE if your workspace doesn't support CAN_EDIT - # - name: "tavily-api-key" - # description: "(Optional) For external search over internet, you may use Tavily API Key (https://docs.tavily.com/documentation/quickstart)" - # input_spec: - # type: "text" - # required: false - user_api_scopes: - "sql" - "serving.serving-endpoints" From 3d63d1fcc2d231bf84a7a3f4d7bbbd9ae18fddfa Mon Sep 17 00:00:00 2001 From: Nitin Aggarwal Date: Tue, 30 Sep 2025 13:14:42 -0400 Subject: [PATCH 09/12] Update UI --- agent_genie/templates/index.html | 770 ++++++++++++++++++++++--------- 1 file changed, 561 insertions(+), 209 deletions(-) diff --git a/agent_genie/templates/index.html b/agent_genie/templates/index.html index e2d7ceaf..08f43d13 100644 --- a/agent_genie/templates/index.html +++ b/agent_genie/templates/index.html @@ -605,56 +605,108 @@ .tag-buttons { display: flex; - flex-wrap: wrap; - gap: 10px; + flex-direction: column; + gap: 12px; } .suggested-tags button { background: #e3eafc; border: none; border-radius: 20px; - padding: 10px 16px; + padding: 12px 20px; font-size: 14px; color: #1a73e8; - font-weight: 500; + font-weight: 400; cursor: pointer; transition: background 0.2s; + width: 100%; + text-align: left; + min-height: 48px; + display: flex; + align-items: center; + justify-content: flex-start; } .suggested-tags button:hover { background: #d1defb; } + /* Horizontal Container for Collapsible Elements */ + .collapsible-container { + display: grid; + grid-template-columns: repeat(3, 1fr); + gap: 20px; + width: 100%; + max-width: 1200px; + margin: 0 auto 20px auto; + align-items: start; + } + /* Fixed Questions Styles */ .fixed-questions { - background: #ffffff; - border: 1px solid #e0e0e0; - border-radius: 12px; - box-shadow: 0 4px 12px rgba(0, 0, 0, 0.06); + background: linear-gradient(135deg, #ffffff 0%, #f8fbff 100%); + border: 1px solid #e0ecff; + border-radius: 16px; + box-shadow: 0 8px 32px rgba(26, 115, 232, 0.08); animation: fadeIn 0.5s ease-in-out; box-sizing: border-box; + transition: all 0.3s ease; + position: relative; + overflow: hidden; + height: 420px; + display: flex; + flex-direction: column; + } + + .fixed-questions::before { + content: ''; + position: absolute; + top: 0; + left: 0; + right: 0; + height: 2px; + background: linear-gradient(90deg, #1a73e8, #4285f4, #34a853, #ea4335); + opacity: 0; + transition: opacity 0.3s ease; + } + + .fixed-questions:hover::before { + opacity: 1; + } + + .fixed-questions:hover { + transform: translateY(-2px); + box-shadow: 0 12px 48px rgba(26, 115, 232, 0.12); + border-color: #1a73e8; } .fixed-questions-header { - padding: 18px 20px; + padding: 20px 24px; font-weight: 600; cursor: pointer; display: flex; justify-content: space-between; align-items: center; - background: #f8fbff; - border-radius: 12px 12px 0 0; - border-bottom: 1px solid #e0e0e0; - transition: background-color 0.2s ease; + background: linear-gradient(135deg, #f8fbff 0%, #e3f2fd 100%); + border-radius: 16px 16px 0 0; + border-bottom: 1px solid #e0ecff; + transition: all 0.3s ease; + position: relative; + font-size: 15px; + color: #1565c0; + flex-shrink: 0; + min-height: 60px; } .fixed-questions-header:hover { - background: #ddeeff; + background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); + color: #0d47a1; } .collapse-icon { - transition: transform 0.3s ease; + transition: transform 0.4s cubic-bezier(0.4, 0, 0.2, 1); color: #1a73e8; + filter: drop-shadow(0 2px 4px rgba(26, 115, 232, 0.2)); } .collapse-icon.collapsed { @@ -662,42 +714,91 @@ } .fixed-questions-content { - max-height: 300px; - overflow: hidden; - transition: max-height 0.3s ease-out, padding 0.3s ease-out; - padding: 18px 20px; + flex: 1; + overflow-y: auto; + overflow-x: hidden; + transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1); + padding: 20px 24px; + background: linear-gradient(135deg, #ffffff 0%, #fafbff 100%); + border-radius: 0 0 16px 16px; } .fixed-questions-content.collapsed { - max-height: 0; - padding: 0 20px; + flex: 0; + overflow: hidden; + padding: 0 24px; + opacity: 0; + } + + /* Custom scrollbar for content areas */ + .fixed-questions-content::-webkit-scrollbar { + width: 6px; + } + + .fixed-questions-content::-webkit-scrollbar-track { + background: rgba(0, 0, 0, 0.05); + border-radius: 3px; + } + + .fixed-questions-content::-webkit-scrollbar-thumb { + background: linear-gradient(135deg, #1a73e8, #4285f4); + border-radius: 3px; + transition: background 0.3s ease; + } + + .fixed-questions-content::-webkit-scrollbar-thumb:hover { + background: linear-gradient(135deg, #125fc3, #1a73e8); } .fixed-questions .tag-buttons { display: flex; - flex-wrap: wrap; - gap: 10px; + flex-direction: column; + gap: 12px; } .fixed-questions button { - background: #ffffff; - border: 2px solid #b8daf8; - border-radius: 20px; - padding: 10px 16px; + background: linear-gradient(135deg, #ffffff 0%, #f8fbff 100%); + border: 2px solid #e3f2fd; + border-radius: 24px; + padding: 12px 20px; font-size: 14px; - color: #1a73e8; - font-weight: 500; + color: #1565c0; + font-weight: 400; cursor: pointer; - transition: all 0.2s; - box-shadow: 0 2px 4px rgba(26, 115, 232, 0.1); - margin: 4px 0; + transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); + box-shadow: 0 4px 12px rgba(26, 115, 232, 0.08); + margin: 0; + position: relative; + overflow: hidden; + width: 100%; + text-align: left; + min-height: 48px; + display: flex; + align-items: center; + justify-content: flex-start; + } + + .fixed-questions button::before { + content: ''; + position: absolute; + top: 0; + left: -100%; + width: 100%; + height: 100%; + background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.4), transparent); + transition: left 0.5s ease; + } + + .fixed-questions button:hover::before { + left: 100%; } .fixed-questions button:hover { - background: #e3eafc; + background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); border-color: #1a73e8; - transform: translateY(-1px); - box-shadow: 0 4px 8px rgba(26, 115, 232, 0.2); + transform: translateY(-2px); + box-shadow: 0 8px 24px rgba(26, 115, 232, 0.15); + color: #0d47a1; } @keyframes fadeIn { @@ -823,7 +924,16 @@ } /* Mobile tweaks */ - @media (max-width: 600px) { + @media (max-width: 768px) { + .collapsible-container { + grid-template-columns: 1fr; + gap: 12px; + } + + .fixed-questions { + height: 350px; + } + .header-section { flex-direction: column; align-items: flex-start; @@ -854,14 +964,16 @@ .tag-buttons { flex-direction: column; - gap: 8px; + gap: 10px; } .suggested-tags button { width: 100%; + min-height: 44px; + font-size: 13px; + padding: 10px 16px; } - .feedback-buttons { flex-direction: column; } @@ -873,12 +985,42 @@ .fixed-questions .tag-buttons { flex-direction: column; - gap: 8px; + gap: 10px; } .fixed-questions button { width: 100%; text-align: left; + min-height: 44px; + font-size: 13px; + padding: 10px 16px; + } + + .fixed-questions-header { + padding: 16px 20px; + font-size: 14px; + } + + .fixed-questions-content { + padding: 16px 20px; + } + + .fixed-questions-content.collapsed { + padding: 0 20px; + } + } + + @media (max-width: 1024px) and (min-width: 769px) { + .collapsible-container { + grid-template-columns: repeat(3, 1fr); + gap: 16px; + } + } + + @media (max-width: 900px) and (min-width: 769px) { + .collapsible-container { + grid-template-columns: repeat(2, 1fr); + gap: 16px; } } @@ -937,6 +1079,31 @@ flex-wrap: wrap; gap: 8px; margin-bottom: 12px; + max-height: 200px; + overflow-y: auto; + padding: 8px; + background: rgba(255, 255, 255, 0.5); + border-radius: 8px; + border: 1px solid #e3f2fd; + } + + /* Custom scrollbar for schema columns */ + .schema-columns::-webkit-scrollbar { + width: 6px; + } + + .schema-columns::-webkit-scrollbar-track { + background: rgba(0, 0, 0, 0.05); + border-radius: 3px; + } + + .schema-columns::-webkit-scrollbar-thumb { + background: linear-gradient(135deg, #1a73e8, #4285f4); + border-radius: 3px; + } + + .schema-columns::-webkit-scrollbar-thumb:hover { + background: linear-gradient(135deg, #125fc3, #1a73e8); } .schema-column { @@ -962,6 +1129,27 @@ background: #f8f9fa; border-radius: 8px; border-left: 4px solid #28a745; + max-height: 120px; + overflow-y: auto; + } + + /* Custom scrollbar for required columns info */ + .required-columns-info::-webkit-scrollbar { + width: 6px; + } + + .required-columns-info::-webkit-scrollbar-track { + background: rgba(0, 0, 0, 0.05); + border-radius: 3px; + } + + .required-columns-info::-webkit-scrollbar-thumb { + background: linear-gradient(135deg, #28a745, #20c997); + border-radius: 3px; + } + + .required-columns-info::-webkit-scrollbar-thumb:hover { + background: linear-gradient(135deg, #20c997, #17a2b8); } .required-columns-title { @@ -970,6 +1158,185 @@ margin-bottom: 8px; font-size: 14px; } + + /* Document Analyzer Styles */ + .upload-section { + margin-bottom: 20px; + } + + .file-input-wrapper { + position: relative; + margin-bottom: 16px; + } + + .file-input { + position: absolute; + opacity: 0; + width: 100%; + height: 100%; + cursor: pointer; + z-index: 2; + } + + .file-input-display { + display: flex; + align-items: center; + padding: 16px 20px; + border: 2px dashed #e3f2fd; + border-radius: 12px; + background: linear-gradient(135deg, #ffffff 0%, #f8fbff 100%); + transition: all 0.3s ease; + cursor: pointer; + min-height: 60px; + } + + .file-input-display:hover { + border-color: #1a73e8; + background: linear-gradient(135deg, #f8fbff 0%, #e3f2fd 100%); + transform: translateY(-1px); + box-shadow: 0 4px 12px rgba(26, 115, 232, 0.1); + } + + .file-icon { + font-size: 24px; + margin-right: 12px; + opacity: 0.7; + } + + .file-text { + display: flex; + flex-direction: column; + flex: 1; + } + + .file-label { + font-weight: 500; + color: #1565c0; + font-size: 14px; + margin-bottom: 2px; + } + + .file-hint { + font-size: 12px; + color: #666; + font-style: italic; + } + + .upload-button { + width: 100%; + padding: 12px 20px; + background: linear-gradient(135deg, #1a73e8 0%, #4285f4 100%); + color: white; + border: none; + border-radius: 24px; + font-size: 14px; + font-weight: 600; + cursor: pointer; + transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); + display: flex; + align-items: center; + justify-content: center; + gap: 8px; + box-shadow: 0 4px 12px rgba(26, 115, 232, 0.2); + margin-bottom: 12px; + } + + .upload-button:hover { + background: linear-gradient(135deg, #125fc3 0%, #1a73e8 100%); + transform: translateY(-2px); + box-shadow: 0 8px 24px rgba(26, 115, 232, 0.3); + } + + .upload-button:disabled { + background: #ccc; + cursor: not-allowed; + transform: none; + box-shadow: none; + } + + .upload-icon { + font-size: 16px; + } + + .upload-status { + font-size: 13px; + font-weight: 500; + padding: 8px 12px; + border-radius: 8px; + text-align: center; + min-height: 20px; + transition: all 0.3s ease; + } + + .upload-status:not(:empty) { + margin-bottom: 12px; + } + + .settings-section { + border-top: 1px solid #e3f2fd; + padding-top: 16px; + } + + .checkbox-wrapper { + display: flex; + align-items: center; + } + + .custom-checkbox { + position: absolute; + opacity: 0; + cursor: pointer; + } + + .checkbox-label { + display: flex; + align-items: center; + cursor: pointer; + font-size: 14px; + color: #5f6368; + font-weight: 500; + user-select: none; + } + + .checkmark { + width: 20px; + height: 20px; + border: 2px solid #e3f2fd; + border-radius: 4px; + margin-right: 12px; + display: flex; + align-items: center; + justify-content: center; + transition: all 0.3s ease; + background: #ffffff; + } + + .checkmark::after { + content: 'βœ“'; + color: white; + font-size: 12px; + font-weight: bold; + opacity: 0; + transition: opacity 0.2s ease; + } + + .custom-checkbox:checked + .checkbox-label .checkmark { + background: linear-gradient(135deg, #1a73e8 0%, #4285f4 100%); + border-color: #1a73e8; + } + + .custom-checkbox:checked + .checkbox-label .checkmark::after { + opacity: 1; + } + + .checkbox-label:hover .checkmark { + border-color: #1a73e8; + background: #f8fbff; + } + + .custom-checkbox:checked + .checkbox-label:hover .checkmark { + background: linear-gradient(135deg, #125fc3 0%, #1a73e8 100%); + } @@ -984,92 +1351,96 @@

Databricks Genie++ Omni-Analytics Platform

A multi-agent AI engine that answers questions, forecasts trends, classifies data, summarizes records, translates languages, and recommends next-best actions. For any questions or feedback, please reach out to nitin.aggarwal@databricks.com - -