From 48360105b456330257fabe43ea951223cc95f446 Mon Sep 17 00:00:00 2001 From: armanjindal Date: Thu, 22 May 2025 18:59:47 -0700 Subject: [PATCH 1/5] improvements to HR demo for HN video --- .../app/apis/get_leaderboard.py | 5 +- .../apis/get_user_live_heart_rate_stats.py | 6 +- .../app/datamodels/AppleWatchHRPacket.py | 9 + .../app/datamodels/UnifiedHRPacket.py | 3 +- .../functions/applewatch_to_unified_packet.py | 48 +++ .../functions/bluetooth_to_unified_packet.py | 2 +- .../processed_ant_to_unified_packet.py | 2 +- .../live-heartrate-leaderboard/app/main.py | 5 +- .../app/pipelines/pipelines.py | 17 +- .../1.generate_mock_ant_hr_data.py | 5 +- .../app/streamlit_app.py | 257 ++++++++++------ .../mock-user-db.json | 18 +- .../streamlit_app.py | 283 ------------------ 13 files changed, 261 insertions(+), 399 deletions(-) create mode 100644 templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py create mode 100644 templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py delete mode 100644 templates/live-heartrate-leaderboard/streamlit_app.py diff --git a/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py b/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py index 9759e3c58d..020bdb4bc6 100644 --- a/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py +++ b/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py @@ -74,9 +74,8 @@ def run(client: MooseClient, params: LeaderboardQueryParams) -> LeaderboardRespo round(countIf(hr_value >= 160 AND hr_value < 180) / count() * 100, 1) as zone4_percentage, round(countIf(hr_value >= 180) / count() * 100, 1) as zone5_percentage FROM unified_hr_packet - WHERE hr_timestamp_seconds >= ( - SELECT MAX(hr_timestamp_seconds) - toInt32({time_window_seconds}) - FROM unified_hr_packet + WHERE processed_timestamp >= ( + toDateTime64(now(), 3) - toIntervalSecond(toInt32({time_window_seconds})) ) GROUP BY user_name ) diff --git a/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py b/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py index f4826fa60c..37beac15e7 100644 --- a/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py +++ b/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py @@ -61,10 +61,8 @@ def run(client: MooseClient, params: QueryParams) -> HeartRateStats: (hr_timestamp_seconds - lagInFrame(hr_timestamp_seconds, 1, hr_timestamp_seconds - 1) OVER (PARTITION BY user_name ORDER BY hr_timestamp_seconds)))/60 as calories_burned FROM unified_hr_packet WHERE user_name = {user_name} - AND hr_timestamp_seconds >= ( - SELECT MAX(hr_timestamp_seconds) - {window_seconds} - FROM unified_hr_packet - WHERE user_name = {user_name} + AND processed_timestamp >= ( + toDateTime64(now(), 3) - toIntervalSecond(toInt32({window_seconds})) ) ) SELECT diff --git a/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py new file mode 100644 index 0000000000..afa5d3de3b --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel +from moose_lib import Key +from typing import List + +class AppleWatchHRPacket(BaseModel): + device_id: Key[int] + heart_rate_data: int + # timestamp_ns: float + # rr_interval_ms: int \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py index 81da1fb273..23e14b822f 100644 --- a/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py +++ b/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py @@ -1,6 +1,7 @@ from moose_lib import Key from pydantic import BaseModel from datetime import datetime +from typing import Optional class UnifiedHRPacket(BaseModel): user_id: Key[int] @@ -8,7 +9,7 @@ class UnifiedHRPacket(BaseModel): device_id: int hr_timestamp_seconds: float hr_value: float - rr_interval_ms: float + rr_interval_ms: Optional[float] processed_timestamp: datetime # hr_max: float # hr_zone: int \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py new file mode 100644 index 0000000000..b1addca5a5 --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py @@ -0,0 +1,48 @@ +from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket +from app.datamodels.UnifiedHRPacket import UnifiedHRPacket +from typing import Optional +from datetime import datetime, timezone +from pathlib import Path +import json +from moose_lib import Logger, Key + +def load_device_dict(): + json_path = Path(__file__).parents[2] / 'mock-user-db.json' + logger = Logger(action="SF") + # logger.info(f'Starting streaming function and loading mock user db from {json_path}') + + with open(json_path) as f: + device_dict = json.load(f) + # Filter to only include devices marked as live bluetooth devices + device_dict = {k: v for k, v in device_dict.items() if v.get('live_bt_device') == "True"} + + return device_dict + +all_bluetooth_device_dict = load_device_dict() + +logger = Logger(action="SF") +# logger.info(f'all_bluetooth_device_dict: {all_bluetooth_device_dict}') + +# Capture the moment this module is first imported as the streaming start time (UTC) +stream_start_time = datetime.now(timezone.utc) +# logger.info(f"Apple Watch streaming function start time: {stream_start_time.isoformat()}") + +def apple_watch_to_unified(source: AppleWatchHRPacket) -> UnifiedHRPacket: + device_id = source.device_id # This is a Key[int] + device_id_str = str(device_id) + device_dict = all_bluetooth_device_dict[device_id_str] + user_name = device_dict.get('user_name') + user_id = device_dict.get('user_id') # This should already be an integer + + # Time elapsed (in seconds) since the streaming function/module was first loaded + elapsed_seconds = (datetime.now(timezone.utc) - stream_start_time).total_seconds() + + return UnifiedHRPacket( + user_id=user_id, # UnifiedHRPacket will handle the Key[int] conversion + user_name=user_name, + device_id=int(device_id_str), # Convert to plain int + hr_timestamp_seconds=elapsed_seconds, + hr_value=source.heart_rate_data, + rr_interval_ms=0, # not included in apple watch + processed_timestamp=datetime.now(timezone.utc), + ) diff --git a/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py index e177ae86a0..56fa0d84df 100644 --- a/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py +++ b/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py @@ -9,7 +9,7 @@ def load_device_dict(): json_path = Path(__file__).parents[2] / 'mock-user-db.json' logger = Logger(action="SF") - logger.info(f'Starting streaming function and loading mock user db from {json_path}') + # logger.info(f'Starting streaming function and loading mock user db from {json_path}') with open(json_path) as f: device_dict = json.load(f) diff --git a/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py index 4f54892eb1..ff30bd1f85 100644 --- a/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py +++ b/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py @@ -11,7 +11,7 @@ def load_device_dict(): json_path = Path(__file__).parents[2] / 'mock-user-db.json' logger = Logger(action="SF") - logger.info(f'Starting streaming function and loading mock user db from {json_path}') + # logger.info(f'Starting streaming function and loading mock user db from {json_path}') with open(json_path) as f: device_dict = json.load(f) diff --git a/templates/live-heartrate-leaderboard/app/main.py b/templates/live-heartrate-leaderboard/app/main.py index a9b35fe645..040c799d84 100644 --- a/templates/live-heartrate-leaderboard/app/main.py +++ b/templates/live-heartrate-leaderboard/app/main.py @@ -3,12 +3,9 @@ from moose_lib import StreamingFunction from moose_lib import ConsumptionApi -from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline -# Instatiated materialized views for in DB processing -from app.views.aggregated_per_second import aggregateHeartRateSummaryPerSecondMV +from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline, appleWatchHRPipeline -# Instantiate APIs from app.apis.get_leaderboard import get_leaderboard_api from app.apis.get_user_live_heart_rate_stats import get_user_live_heart_rate_stats diff --git a/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py b/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py index 8cb1fa39a5..64f83d70c3 100644 --- a/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py +++ b/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py @@ -5,8 +5,10 @@ from app.datamodels.ProcessedAntHRPacket import ProcessedAntHRPacket from app.datamodels.RawAntHRPacket import RawAntHRPacket from app.datamodels.BluetoothHRPacket import BluetoothHRPacket +from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket # Instantiate functions for kafka stream processing +from app.functions.applewatch_to_unified_packet import apple_watch_to_unified from app.functions.raw_ant_to_processed_ant_packet import RawAntHRPacket__ProcessedAntHRPacket from app.functions.processed_ant_to_unified_packet import processedAntHRPacket__UNIFIED_HR_PACKET from app.functions.bluetooth_to_unified_packet import bluetoothHRPacket__UNIFIED_HRM_MODEL @@ -37,6 +39,13 @@ stream=True, table=True )) +# Create a ingest pipeline for Appple Health + +appleWatchHRPipeline = IngestPipeline[AppleWatchHRPacket]("apple_health_watch_packet", IngestPipelineConfig( + ingest=True, + stream=True, + table=True +)) # Transform RawAntHRPacket to ProcessedAntHRPacket in stream rawAntHRPipeline.get_stream().add_transform( @@ -52,4 +61,10 @@ bluetoothHRPipeline.get_stream().add_transform( destination=unifiedHRPipeline.get_stream(), transformation=bluetoothHRPacket__UNIFIED_HRM_MODEL -) \ No newline at end of file +) + + +appleWatchHRPipeline.get_stream().add_transform( + destination=unifiedHRPipeline.get_stream(), + transformation=apple_watch_to_unified +) diff --git a/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py b/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py index 7f45d01a2e..e777331877 100644 --- a/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py +++ b/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py @@ -78,10 +78,11 @@ def generate_mock_ant_hr_data(): json_data = json.dumps(ant_packet) try: - logger.info(f"Sending JSON: {json_data}") + # logger.info(f"Sending JSON: {json_data}") response = requests.post(url, data=json_data, headers=headers) if response.status_code == 200: - logger.info(f"Successfully sent packet: {ant_packet}") + # logger.info(f"Successfully sent packet: {ant_packet}") + pass else: print(f"Failed to send packet: {response.status_code}, {response.text}") except requests.exceptions.RequestException as e: diff --git a/templates/live-heartrate-leaderboard/app/streamlit_app.py b/templates/live-heartrate-leaderboard/app/streamlit_app.py index f762cee0f7..c4640cf717 100644 --- a/templates/live-heartrate-leaderboard/app/streamlit_app.py +++ b/templates/live-heartrate-leaderboard/app/streamlit_app.py @@ -5,6 +5,7 @@ import pandas as pd from datetime import datetime, timezone import numpy as np +import json # Page config st.set_page_config( @@ -14,53 +15,53 @@ ) # Modern animated banner -st.markdown(""" - - -
- Go from prototype to production
- \n Made with ❤️ MOOSE X SINGLESTONE \n - Learn More: docs.fiveonefour.com/moose - Download: bash -i <(curl -fsSL https://fiveonefour.com/install.sh) moose) -
-""", unsafe_allow_html=True) +# st.markdown(""" +# + +#
+# Go from prototype to production
+# \n Made with ❤️ MOOSE X SINGLESTONE \n +# Learn More: docs.fiveonefour.com/moose +# Download: bash -i <(curl -fsSL https://fiveonefour.com/install.sh) moose) +#
+# """, unsafe_allow_html=True) # Initialize session state and constants if 'selected_user' not in st.session_state: @@ -78,6 +79,37 @@ # Constants LEADERBOARD_TIME_WINDOW = 300 # 5 minutes in seconds +# Ensure a default selected user (top of leaderboard) when the app loads +def ensure_default_selected_user(): + if st.session_state.selected_user is None: + try: + resp = requests.get( + f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=1" + ) + if resp.status_code == 200 and resp.json().get("entries"): + st.session_state.selected_user = resp.json()["entries"][0]["user_name"] + except Exception: + pass + +ensure_default_selected_user() + +# Helper: fetch profile image for a given user name from mock-user-db.json +def get_profile_image(user_name: str): + """Return the profile image URL for the provided user name or None if not found.""" + try: + with open("mock-user-db.json", "r") as f: + user_db = json.load(f) + for user in user_db.values(): + if user.get("user_name") == user_name: + return user.get("profile_image") + except Exception as e: + # Use Streamlit's error display only if we are in a Streamlit context + try: + st.error(f"Failed to load user image: {str(e)}") + except Exception: + pass + return None + # Function to update the live graph def update_live_graph(): try: @@ -119,43 +151,71 @@ def update_live_graph(): # Function to update the leaderboard def update_leaderboard(): + """Return HTML table (with avatars) + row count.""" try: - response = requests.get(f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=10") + response = requests.get( + f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=100" + ) if response.status_code == 200: data = response.json()["entries"] df = pd.DataFrame(data) - - # Display only relevant columns - display_cols = ['rank', 'user_name', 'avg_heart_rate', 'avg_power', 'total_calories'] + + # Relevant columns for display (include max stats) + display_cols = [ + "rank", + "user_name", + "avg_heart_rate", + "max_heart_rate", + "avg_power", + "max_power", + "total_calories", + ] df_display = df[display_cols].copy() - - # Add styling to highlight selected user + + # Round numeric columns to 1 decimal place (excluding rank) + numeric_cols = [ + "avg_heart_rate", + "max_heart_rate", + "avg_power", + "max_power", + "total_calories", + ] + df_display[numeric_cols] = df_display[numeric_cols].round(1) + + # Inject avatar HTML in a new first column and re-order + df_display.insert( + 0, + "avatar", + df_display["user_name"].apply( + lambda u: f"" + ), + ) + + # Row-highlighting for the current selected user def highlight_selected_user(row): - if row['user_name'] == st.session_state.selected_user: - return ['background-color: #FF4B4B30'] * len(row) - return [''] * len(row) - - styled_df = df_display.style.apply(highlight_selected_user, axis=1) - return styled_df - except Exception as e: - st.error(f"Failed to update leaderboard: {str(e)}") - return None + if row["user_name"] == st.session_state.selected_user: + return ["background-color: #FF4B4B30"] * len(row) + return [""] * len(row) -# Title and user selection in the same row -title_col, select_col = st.columns([1, 1]) -with title_col: - st.title("❤️ Live Heart Rate Monitor") + fmt_dict = {col: "{:.1f}" for col in numeric_cols} -with select_col: - try: - response = requests.get(f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=100") - if response.status_code == 200: - users = [entry["user_name"] for entry in response.json()["entries"]] - # Add some vertical space to align with title - st.write("") - selected_user = st.selectbox("Select User", users, label_visibility="collapsed") - if selected_user != st.session_state.selected_user: - st.session_state.selected_user = selected_user + styled = ( + df_display.style.hide(axis="index") + .format(fmt_dict) # type: ignore[arg-type] + .apply(highlight_selected_user, axis=1) + ) + + # Convert to raw HTML so avatar tags render and widen table + html_table = styled.to_html(escape=False).replace( + "", + unsafe_allow_html=True, + ) + # Create and update graph if not st.session_state.hr_data.empty: fig = go.Figure() @@ -256,15 +329,25 @@ def highlight_selected_user(row): else: st.info("No data available yet. Please wait for data to appear.") -# Leaderboard Section -st.header("🏆 Leaderboard") -leaderboard_df = update_leaderboard() -if leaderboard_df is not None: - # Calculate height based on number of rows (approximately 35px per row plus 35px for header) - num_rows = len(leaderboard_df.data) - table_height = (num_rows + 1) * 35 - st.dataframe(leaderboard_df, use_container_width=True, height=table_height) + +# Leaderboard returns HTML string + row count +leaderboard_result = update_leaderboard() +if leaderboard_result is not None: + leaderboard_html, num_rows = leaderboard_result + # If we have many rows, constrain the height; otherwise display full table without scrollbar + ROW_HEIGHT_PX = 35 # approximate row height incl. header padding + MAX_VISIBLE_ROWS = 10 # only add scrolling if more than this many rows + + if num_rows > MAX_VISIBLE_ROWS: + table_height = (MAX_VISIBLE_ROWS + 1) * ROW_HEIGHT_PX # header + visible rows + st.markdown( + f"
{leaderboard_html}
", + unsafe_allow_html=True, + ) + else: + # No overflow restriction – full height so no scrolling needed for small tables + st.markdown(leaderboard_html, unsafe_allow_html=True) else: st.info("Loading leaderboard data...") diff --git a/templates/live-heartrate-leaderboard/mock-user-db.json b/templates/live-heartrate-leaderboard/mock-user-db.json index 2bd7633a0e..574f981d54 100644 --- a/templates/live-heartrate-leaderboard/mock-user-db.json +++ b/templates/live-heartrate-leaderboard/mock-user-db.json @@ -1,11 +1,4 @@ { - "1111": { - "user_id": 1, - "live_bt_device": "True", - "user_name": "David", - "profile_image": "https://ca.slack-edge.com/T06ETRB50-U07SBJAF0HG-24682c3e529b-512" - }, - "12345": { "user_id": 0, "user_name": "Joj", @@ -13,16 +6,17 @@ }, "12346": { "user_id": 2, - "user_name": "Tim", - "profile_image": "https://ca.slack-edge.com/T062WLJGPMW-U062A4HPG4C-f7dfbbfa5065-512" + "user_name": "Chris", + "profile_image": "https://ca.slack-edge.com/T062WLJGPMW-U06DCH974P9-73f93b803491-192" }, "12347": { "user_id": 3, - "user_name": "Alex", - "profile_image": "https://ca.slack-edge.com/T062WLJGPMW-U0624P1RC0M-a52dbc57c8a8-512" + "user_name": "Isa", + "profile_image": "https://ca.slack-edge.com/T062WLJGPMW-U0820SCK9HN-8054493448fa-512" }, "12348": { - "user_id": 4, + "user_id": 1, + "live_bt_device": "True", "user_name": "Arman", "profile_image": "https://ca.slack-edge.com/T062WLJGPMW-U079W7WJ8MV-31c7dff99db9-512" }, diff --git a/templates/live-heartrate-leaderboard/streamlit_app.py b/templates/live-heartrate-leaderboard/streamlit_app.py deleted file mode 100644 index f762cee0f7..0000000000 --- a/templates/live-heartrate-leaderboard/streamlit_app.py +++ /dev/null @@ -1,283 +0,0 @@ -import streamlit as st -import requests -import time -import plotly.graph_objects as go -import pandas as pd -from datetime import datetime, timezone -import numpy as np - -# Page config -st.set_page_config( - page_title="Live Heart Rate Monitor", - page_icon="❤️", - layout="wide" -) - -# Modern animated banner -st.markdown(""" - - -
- Go from prototype to production
- \n Made with ❤️ MOOSE X SINGLESTONE \n - Learn More: docs.fiveonefour.com/moose - Download: bash -i <(curl -fsSL https://fiveonefour.com/install.sh) moose) -
-""", unsafe_allow_html=True) - -# Initialize session state and constants -if 'selected_user' not in st.session_state: - st.session_state.selected_user = None -if 'hr_data' not in st.session_state: - # Initialize with explicit dtypes to ensure consistency - st.session_state.hr_data = pd.DataFrame({ - 'timestamp': pd.Series(dtype='datetime64[ns, UTC]'), - 'heart_rate': pd.Series(dtype='float64'), - 'hr_zone': pd.Series(dtype='int64'), - 'estimated_power': pd.Series(dtype='float64'), - 'cumulative_calories_burned': pd.Series(dtype='float64') - }) - -# Constants -LEADERBOARD_TIME_WINDOW = 300 # 5 minutes in seconds - -# Function to update the live graph -def update_live_graph(): - try: - if st.session_state.selected_user: - response = requests.get( - f"http://localhost:4000/consumption/getUserLiveHeartRateStats?user_name={st.session_state.selected_user}&window_seconds=60" - ) - if response.status_code == 200: - data = response.json() - if data: - # Convert data to DataFrame - new_data = pd.DataFrame([{ - 'timestamp': datetime.fromisoformat(d['processed_timestamp'].replace('Z', '+00:00')), - 'heart_rate': d['heart_rate'], - 'hr_zone': d['hr_zone'], - 'estimated_power': d['estimated_power'], - 'cumulative_calories_burned': d['cumulative_calories_burned'] - } for d in data]) - - # Update session state data - if st.session_state.hr_data.empty: - st.session_state.hr_data = new_data - else: - st.session_state.hr_data = pd.concat([st.session_state.hr_data, new_data], ignore_index=True) - - # Drop duplicates and sort by timestamp - st.session_state.hr_data = st.session_state.hr_data.drop_duplicates(subset=['timestamp']) - st.session_state.hr_data = st.session_state.hr_data.sort_values('timestamp') - - # Keep only last 60 seconds of data - cutoff_time = datetime.now(timezone.utc) - pd.Timedelta(seconds=60) - st.session_state.hr_data = st.session_state.hr_data[st.session_state.hr_data['timestamp'] > cutoff_time] - - # Return the most recent data point - return st.session_state.hr_data.iloc[-1] if not st.session_state.hr_data.empty else None - except Exception as e: - st.error(f"Failed to update graph: {str(e)}") - return None - -# Function to update the leaderboard -def update_leaderboard(): - try: - response = requests.get(f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=10") - if response.status_code == 200: - data = response.json()["entries"] - df = pd.DataFrame(data) - - # Display only relevant columns - display_cols = ['rank', 'user_name', 'avg_heart_rate', 'avg_power', 'total_calories'] - df_display = df[display_cols].copy() - - # Add styling to highlight selected user - def highlight_selected_user(row): - if row['user_name'] == st.session_state.selected_user: - return ['background-color: #FF4B4B30'] * len(row) - return [''] * len(row) - - styled_df = df_display.style.apply(highlight_selected_user, axis=1) - return styled_df - except Exception as e: - st.error(f"Failed to update leaderboard: {str(e)}") - return None - -# Title and user selection in the same row -title_col, select_col = st.columns([1, 1]) -with title_col: - st.title("❤️ Live Heart Rate Monitor") - -with select_col: - try: - response = requests.get(f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=100") - if response.status_code == 200: - users = [entry["user_name"] for entry in response.json()["entries"]] - # Add some vertical space to align with title - st.write("") - selected_user = st.selectbox("Select User", users, label_visibility="collapsed") - if selected_user != st.session_state.selected_user: - st.session_state.selected_user = selected_user - st.session_state.hr_data = pd.DataFrame({ - 'timestamp': pd.Series(dtype='datetime64[ns, UTC]'), - 'heart_rate': pd.Series(dtype='float64'), - 'hr_zone': pd.Series(dtype='int64'), - 'estimated_power': pd.Series(dtype='float64'), - 'cumulative_calories_burned': pd.Series(dtype='float64') - }) - except Exception as e: - st.error(f"Failed to fetch users: {str(e)}") - users = [] - -# Metrics row -metrics_cols = st.columns(4) - -# Update data and metrics -latest_data = update_live_graph() - -# Display metrics -if latest_data is not None: - metrics_cols[0].metric( - "Heart Rate", - f"{latest_data['heart_rate']} BPM", - delta=None - ) - metrics_cols[1].metric( - "Zone", - f"Zone {latest_data['hr_zone']}", - delta=None - ) - metrics_cols[2].metric( - "Power", - f"{latest_data['estimated_power']}W", - delta=None - ) - metrics_cols[3].metric( - "Calories", - f"{latest_data['cumulative_calories_burned']:.1f} kcal", - delta=None - ) - -# Create and update graph -if not st.session_state.hr_data.empty: - fig = go.Figure() - - # Add heart rate trace - fig.add_trace(go.Scatter( - x=st.session_state.hr_data['timestamp'], - y=st.session_state.hr_data['heart_rate'], - mode='lines+markers', - name='Heart Rate', - line=dict(color='#FF4B4B', width=2), - fill='tozeroy' - )) - - # Add zone lines - zone_colors = ['rgba(255,255,255,0.3)'] * 4 - zone_labels = ['Zone 1-2', 'Zone 2-3', 'Zone 3-4', 'Zone 4-5'] - zone_values = [120, 140, 160, 180] - - for value, label, color in zip(zone_values, zone_labels, zone_colors): - fig.add_hline( - y=value, - line_dash="dash", - line_color=color, - annotation_text=label, - annotation_position="right" - ) - - # Update layout - fig.update_layout( - xaxis_title="Time", - yaxis_title="Heart Rate (BPM)", - showlegend=False, - height=400, - margin=dict(l=0, r=0, t=20, b=0), - plot_bgcolor='rgba(0,0,0,0)', - paper_bgcolor='rgba(0,0,0,0)', - xaxis=dict( - showgrid=True, - gridcolor='rgba(255,255,255,0.1)', - range=[ - st.session_state.hr_data['timestamp'].min(), - st.session_state.hr_data['timestamp'].max() - ] - ), - yaxis=dict( - showgrid=True, - gridcolor='rgba(255,255,255,0.1)', - range=[ - max(0, st.session_state.hr_data['heart_rate'].min() - 10), - st.session_state.hr_data['heart_rate'].max() + 10 - ] - ) - ) - - # Display the graph - st.plotly_chart(fig, use_container_width=True) -else: - st.info("No data available yet. Please wait for data to appear.") - -# Leaderboard Section -st.header("🏆 Leaderboard") - -leaderboard_df = update_leaderboard() -if leaderboard_df is not None: - # Calculate height based on number of rows (approximately 35px per row plus 35px for header) - num_rows = len(leaderboard_df.data) - table_height = (num_rows + 1) * 35 - st.dataframe(leaderboard_df, use_container_width=True, height=table_height) -else: - st.info("Loading leaderboard data...") - -# Update frequency -time.sleep(1) -st.rerun() - -# Footer -st.markdown("---") -st.markdown(""" -
- Built with Streamlit ❤️ | Monitoring heart rates in real-time -
-""", unsafe_allow_html=True) - - From b0775da23b193cbec51dd085afcb92f63f680b51 Mon Sep 17 00:00:00 2001 From: armanjindal Date: Mon, 26 May 2025 17:10:34 -0700 Subject: [PATCH 2/5] added the simple apple watch BT client --- .../live-heartrate-leaderboard/.gitignore | 3 +- .../app/datamodels/AppleWatchHRPacket.py | 6 +- .../app/datamodels/RawAntHRPacket.py | 4 +- .../functions/applewatch_to_unified_packet.py | 6 +- .../apple_heart_rate_client.py | 83 +++++++++++++++++++ .../scripts/apple_watch_hr_client/config.toml | 5 ++ .../app/streamlit_app.py | 49 ----------- .../mock-user-db.json | 2 +- .../requirements.txt | 1 + 9 files changed, 100 insertions(+), 59 deletions(-) create mode 100644 templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py create mode 100644 templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml diff --git a/templates/live-heartrate-leaderboard/.gitignore b/templates/live-heartrate-leaderboard/.gitignore index 5216c527de..8c252d858b 100644 --- a/templates/live-heartrate-leaderboard/.gitignore +++ b/templates/live-heartrate-leaderboard/.gitignore @@ -38,4 +38,5 @@ wheels *.egg-info/ .installed.cfg *.egg -MANIFEST \ No newline at end of file +MANIFEST +.cursor diff --git a/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py index afa5d3de3b..3adf27eb4d 100644 --- a/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py +++ b/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py @@ -3,7 +3,5 @@ from typing import List class AppleWatchHRPacket(BaseModel): - device_id: Key[int] - heart_rate_data: int - # timestamp_ns: float - # rr_interval_ms: int \ No newline at end of file + device_id: Key[str] + heart_rate_data: int \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py index e5c6a01c26..c11710904a 100644 --- a/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py +++ b/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py @@ -1,7 +1,9 @@ from pydantic import BaseModel from moose_lib import Key +from typing import Optional class RawAntHRPacket(BaseModel): device_id: Key[int] packet_count: int - ant_hr_packet: list[int] \ No newline at end of file + ant_hr_packet: list[int] + timestamp: Optional[float] = None \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py index b1addca5a5..67baf6dd35 100644 --- a/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py +++ b/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py @@ -28,9 +28,9 @@ def load_device_dict(): # logger.info(f"Apple Watch streaming function start time: {stream_start_time.isoformat()}") def apple_watch_to_unified(source: AppleWatchHRPacket) -> UnifiedHRPacket: - device_id = source.device_id # This is a Key[int] - device_id_str = str(device_id) + device_id_str = str(source.device_id) device_dict = all_bluetooth_device_dict[device_id_str] + logger.info(f"device_dict: {device_dict}") user_name = device_dict.get('user_name') user_id = device_dict.get('user_id') # This should already be an integer @@ -40,7 +40,7 @@ def apple_watch_to_unified(source: AppleWatchHRPacket) -> UnifiedHRPacket: return UnifiedHRPacket( user_id=user_id, # UnifiedHRPacket will handle the Key[int] conversion user_name=user_name, - device_id=int(device_id_str), # Convert to plain int + device_id=user_id, # Convert to plain int hr_timestamp_seconds=elapsed_seconds, hr_value=source.heart_rate_data, rr_interval_ms=0, # not included in apple watch diff --git a/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py new file mode 100644 index 0000000000..af3a34d44f --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# Requirements: bleak, requests +import asyncio +from bleak import BleakScanner, BleakClient +import requests +from moose_lib import task, Logger + +HEART_RATE_CHAR_UUID = "00002a37-0000-1000-8000-00805f9b34fb" +INGEST_URL = "http://localhost:4000/ingest/apple_health_watch_packet" + +def parse_heart_rate(data): + # Standard BLE Heart Rate Measurement characteristic parsing + # See: https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/ + flag = data[0] + hr_format = flag & 0x01 + if hr_format == 0: + hr_value = data[1] + else: + hr_value = int.from_bytes(data[1:3], byteorder='little') + return hr_value + +@task +async def apple_watch_hr_client(): + logger = Logger("apple_watch_hr_client") + logger.info("Scanning for Apple Watch devices...") + devices = await BleakScanner.discover() + if not devices: + logger.error("No BLE devices found.") + return + + device = None + for d in devices: + if d.name and "iphone" in d.name.lower(): + device = d + break + + if not device: + logger.error("No iPhone found. Available devices:") + for d in devices: + logger.info(f" {d.name} ({d.address})") + return + + logger.info(f"Connecting to: {device.name} ({device.address})") + + async with BleakClient(device) as client: + logger.info(f"Connected: {client.is_connected}") + + def hr_callback(sender, data): + try: + heart_rate = parse_heart_rate(data) + payload = { + "device_id": device.address, + "heart_rate_data": heart_rate + } + response = requests.post(INGEST_URL, json=payload) + if response.status_code == 200: + logger.info(f"Sent: {payload}") + else: + logger.error(f"Failed to send data: {response.status_code} {response.text}") + except Exception as e: + logger.error(f"Error in callback: {e}") + + logger.info(f"Subscribing to heart rate notifications on {HEART_RATE_CHAR_UUID}...") + try: + await client.start_notify(HEART_RATE_CHAR_UUID, hr_callback) + except Exception as e: + logger.error(f"Error subscribing to heart rate notifications: {e}") + return + logger.info("Subscribed! Waiting for data...") + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("Disconnecting...") + await client.stop_notify(HEART_RATE_CHAR_UUID) + logger.info("Disconnected.") + + return { + "task": "apple_watch_hr_client", + "data": { + "device_id": device.address + } + } \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml new file mode 100644 index 0000000000..daadd8d3a3 --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml @@ -0,0 +1,5 @@ +name = 'apple_watch_hr_client' +schedule = '' +retries = 3 +timeout = '1h' +tasks = ['apple_watch_hr_client'] diff --git a/templates/live-heartrate-leaderboard/app/streamlit_app.py b/templates/live-heartrate-leaderboard/app/streamlit_app.py index c4640cf717..450129bd18 100644 --- a/templates/live-heartrate-leaderboard/app/streamlit_app.py +++ b/templates/live-heartrate-leaderboard/app/streamlit_app.py @@ -14,55 +14,6 @@ layout="wide" ) -# Modern animated banner -# st.markdown(""" -# - -#
-# Go from prototype to production
-# \n Made with ❤️ MOOSE X SINGLESTONE \n -# Learn More: docs.fiveonefour.com/moose -# Download: bash -i <(curl -fsSL https://fiveonefour.com/install.sh) moose) -#
-# """, unsafe_allow_html=True) - # Initialize session state and constants if 'selected_user' not in st.session_state: st.session_state.selected_user = None diff --git a/templates/live-heartrate-leaderboard/mock-user-db.json b/templates/live-heartrate-leaderboard/mock-user-db.json index 574f981d54..d8d1acd7d2 100644 --- a/templates/live-heartrate-leaderboard/mock-user-db.json +++ b/templates/live-heartrate-leaderboard/mock-user-db.json @@ -14,7 +14,7 @@ "user_name": "Isa", "profile_image": "https://ca.slack-edge.com/T062WLJGPMW-U0820SCK9HN-8054493448fa-512" }, - "12348": { + "D01F2534-9BFD-4C9E-DB03-458C053A9BC7": { "user_id": 1, "live_bt_device": "True", "user_name": "Arman", diff --git a/templates/live-heartrate-leaderboard/requirements.txt b/templates/live-heartrate-leaderboard/requirements.txt index ae8e7828ea..9a3720b017 100644 --- a/templates/live-heartrate-leaderboard/requirements.txt +++ b/templates/live-heartrate-leaderboard/requirements.txt @@ -7,3 +7,4 @@ clickhouse-connect==0.7.16 requests>=2.31.0 moose-cli moose-lib +bleak>=0.22.3 \ No newline at end of file From 588b9dce18453b92f668d15dce7cb52636bab143 Mon Sep 17 00:00:00 2001 From: armanjindal Date: Mon, 2 Jun 2025 14:54:28 -0400 Subject: [PATCH 3/5] working bt client --- .../live-heartrate-leaderboard/README.md | 16 ++++-- .../app/pipelines/pipelines.py | 2 +- .../apple_heart_rate_client.py | 52 +++++++++++++++---- .../app/streamlit_app.py | 37 ++++++++----- 4 files changed, 79 insertions(+), 28 deletions(-) diff --git a/templates/live-heartrate-leaderboard/README.md b/templates/live-heartrate-leaderboard/README.md index 1cb64a56bb..442cd99758 100644 --- a/templates/live-heartrate-leaderboard/README.md +++ b/templates/live-heartrate-leaderboard/README.md @@ -13,7 +13,7 @@ source .venv/bin/activate``` 4. Run Moose ```moose dev``` -5. Start workflows +5. Generate mock ANT+ Data In another terminal run: ```moose workflow run generate_data``` @@ -26,6 +26,16 @@ In another terminal run: In another terminal: ```streamlit run app/streamlit_app.py``` -## SOS +## Additional - Connect your own Heart Rate Device (Apple Watch) -```docker stop $(docker ps -aq)``` +Connect your own Apple Watch to the system. This has only been tested on the AppleWatch SE! Please feel free to add your device, following the example in `app/scripts/apple_watch_hr_client` + +1. Download the Echo App on your IPhone/Android +2. Update the name in the code to match your device name (or a substring) `app/scripts/apple_watch_hr_client/apple_heart_rate_client` +3. Update the "user table" in the `mock-user-db.json` to match yours. +3. Run your `moose dev` server and then run `moose workflow run apple_watch_hr_client` + + +To terminate the (temporal) workflow, run: + +`moose workflow terminate apple_watch_hr_client` diff --git a/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py b/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py index 64f83d70c3..432206607c 100644 --- a/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py +++ b/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py @@ -39,7 +39,7 @@ stream=True, table=True )) -# Create a ingest pipeline for Appple Health +# Create an ingest pipeline for HR data streaming from iPhone Echo system & Apple Watch to System via Bluetooth appleWatchHRPipeline = IngestPipeline[AppleWatchHRPacket]("apple_health_watch_packet", IngestPipelineConfig( ingest=True, diff --git a/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py index af3a34d44f..1743b7b8c4 100644 --- a/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py +++ b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py @@ -8,6 +8,9 @@ HEART_RATE_CHAR_UUID = "00002a37-0000-1000-8000-00805f9b34fb" INGEST_URL = "http://localhost:4000/ingest/apple_health_watch_packet" +BLE_DEVICE_NAME = "iPhone" +DEBUG_BLE_MODE = False + def parse_heart_rate(data): # Standard BLE Heart Rate Measurement characteristic parsing # See: https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/ @@ -23,27 +26,54 @@ def parse_heart_rate(data): async def apple_watch_hr_client(): logger = Logger("apple_watch_hr_client") logger.info("Scanning for Apple Watch devices...") - devices = await BleakScanner.discover() - if not devices: + devices_dict = await BleakScanner.discover(return_adv=True) + if not devices_dict: logger.error("No BLE devices found.") - return + return { + "task": "apple_watch_hr_client", + "data": { + "device_id": None, + "error": "No iPhone found" + } + } device = None - for d in devices: - if d.name and "iphone" in d.name.lower(): - device = d - break + for address, (ble_device, adv_data) in devices_dict.items(): + logger.info(f"Device name: {ble_device.name} ({address})") + if ble_device.name and BLE_DEVICE_NAME.lower().strip() in ble_device.name.lower().strip(): + logger.info(f"Found {BLE_DEVICE_NAME}: {ble_device.name} ({ble_device.address})") + device = ble_device if not device: - logger.error("No iPhone found. Available devices:") - for d in devices: - logger.info(f" {d.name} ({d.address})") - return + logger.error(f"No {BLE_DEVICE_NAME} found from BleakScanner") + return { + "task": "apple_watch_hr_client", + "data": { + "device_id": None, + "error": "No iPhone found" + } + } logger.info(f"Connecting to: {device.name} ({device.address})") async with BleakClient(device) as client: logger.info(f"Connected: {client.is_connected}") + # Get all services + if DEBUG_BLE_MODE: + services = await client.get_services() + logger.info("Services:") + for service in services: + logger.info(f"Service: {service.uuid}") + for char in service.characteristics: + logger.info(f" Characteristic: {char.uuid}") + logger.info(f" Properties: {char.properties}") + logger.info(f" Handle: {char.handle}") + if "read" in char.properties: + try: + value = await client.read_gatt_char(char.uuid) + logger.info(f" Value: {value}") + except Exception as e: + logger.info(f" Could not read value: {e}") def hr_callback(sender, data): try: diff --git a/templates/live-heartrate-leaderboard/app/streamlit_app.py b/templates/live-heartrate-leaderboard/app/streamlit_app.py index 450129bd18..3338c72e84 100644 --- a/templates/live-heartrate-leaderboard/app/streamlit_app.py +++ b/templates/live-heartrate-leaderboard/app/streamlit_app.py @@ -7,6 +7,25 @@ import numpy as np import json +# Load user database once at module level +_USER_DB_CACHE = None + +def load_user_db(): + """Load user database from JSON file and cache it.""" + global _USER_DB_CACHE + if _USER_DB_CACHE is None: + try: + with open("mock-user-db.json", "r") as f: + _USER_DB_CACHE = json.load(f) + except Exception as e: + # Use Streamlit's error display only if we are in a Streamlit context + try: + st.error(f"Failed to load user database: {str(e)}") + except Exception: + pass + _USER_DB_CACHE = {} + return _USER_DB_CACHE + # Page config st.set_page_config( page_title="Live Heart Rate Monitor", @@ -44,21 +63,13 @@ def ensure_default_selected_user(): ensure_default_selected_user() -# Helper: fetch profile image for a given user name from mock-user-db.json +# Helper: fetch profile image for a given user name from cached user database def get_profile_image(user_name: str): """Return the profile image URL for the provided user name or None if not found.""" - try: - with open("mock-user-db.json", "r") as f: - user_db = json.load(f) - for user in user_db.values(): - if user.get("user_name") == user_name: - return user.get("profile_image") - except Exception as e: - # Use Streamlit's error display only if we are in a Streamlit context - try: - st.error(f"Failed to load user image: {str(e)}") - except Exception: - pass + user_db = load_user_db() + for user in user_db.values(): + if user.get("user_name") == user_name: + return user.get("profile_image") return None # Function to update the live graph From d2e81432dad6c5ebcf0c12b6369b2ea60edf013c Mon Sep 17 00:00:00 2001 From: armanjindal Date: Mon, 2 Jun 2025 14:55:14 -0400 Subject: [PATCH 4/5] fix readme --- templates/live-heartrate-leaderboard/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/templates/live-heartrate-leaderboard/README.md b/templates/live-heartrate-leaderboard/README.md index 442cd99758..0ac48a44c9 100644 --- a/templates/live-heartrate-leaderboard/README.md +++ b/templates/live-heartrate-leaderboard/README.md @@ -32,8 +32,8 @@ Connect your own Apple Watch to the system. This has only been tested on the App 1. Download the Echo App on your IPhone/Android 2. Update the name in the code to match your device name (or a substring) `app/scripts/apple_watch_hr_client/apple_heart_rate_client` -3. Update the "user table" in the `mock-user-db.json` to match yours. -3. Run your `moose dev` server and then run `moose workflow run apple_watch_hr_client` +3. Update the "user table" in the `mock-user-db.json` to match your details. +4. Run your `moose dev` server and then run `moose workflow run apple_watch_hr_client` To terminate the (temporal) workflow, run: From fcb1d35309d3f0201f9ff07167bceb87bef81a8f Mon Sep 17 00:00:00 2001 From: armanjindal Date: Mon, 2 Jun 2025 14:56:45 -0400 Subject: [PATCH 5/5] update readme again --- templates/live-heartrate-leaderboard/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/live-heartrate-leaderboard/README.md b/templates/live-heartrate-leaderboard/README.md index 0ac48a44c9..1be1552374 100644 --- a/templates/live-heartrate-leaderboard/README.md +++ b/templates/live-heartrate-leaderboard/README.md @@ -30,7 +30,7 @@ In another terminal: Connect your own Apple Watch to the system. This has only been tested on the AppleWatch SE! Please feel free to add your device, following the example in `app/scripts/apple_watch_hr_client` -1. Download the Echo App on your IPhone/Android +1. Download the Echo App on your IPhone/Android device since the Apple Watch doesn't nativley support broadcasting HR data over BLE 2. Update the name in the code to match your device name (or a substring) `app/scripts/apple_watch_hr_client/apple_heart_rate_client` 3. Update the "user table" in the `mock-user-db.json` to match your details. 4. Run your `moose dev` server and then run `moose workflow run apple_watch_hr_client`