diff --git a/templates/live-heartrate-leaderboard/.gitignore b/templates/live-heartrate-leaderboard/.gitignore index 5216c527de..8c252d858b 100644 --- a/templates/live-heartrate-leaderboard/.gitignore +++ b/templates/live-heartrate-leaderboard/.gitignore @@ -38,4 +38,5 @@ wheels *.egg-info/ .installed.cfg *.egg -MANIFEST \ No newline at end of file +MANIFEST +.cursor diff --git a/templates/live-heartrate-leaderboard/README.md b/templates/live-heartrate-leaderboard/README.md index 1cb64a56bb..1be1552374 100644 --- a/templates/live-heartrate-leaderboard/README.md +++ b/templates/live-heartrate-leaderboard/README.md @@ -13,7 +13,7 @@ source .venv/bin/activate``` 4. Run Moose ```moose dev``` -5. Start workflows +5. Generate mock ANT+ Data In another terminal run: ```moose workflow run generate_data``` @@ -26,6 +26,16 @@ In another terminal run: In another terminal: ```streamlit run app/streamlit_app.py``` -## SOS +## Additional - Connect your own Heart Rate Device (Apple Watch) -```docker stop $(docker ps -aq)``` +Connect your own Apple Watch to the system. This has only been tested on the AppleWatch SE! Please feel free to add your device, following the example in `app/scripts/apple_watch_hr_client` + +1. Download the Echo App on your IPhone/Android device since the Apple Watch doesn't nativley support broadcasting HR data over BLE +2. Update the name in the code to match your device name (or a substring) `app/scripts/apple_watch_hr_client/apple_heart_rate_client` +3. Update the "user table" in the `mock-user-db.json` to match your details. +4. Run your `moose dev` server and then run `moose workflow run apple_watch_hr_client` + + +To terminate the (temporal) workflow, run: + +`moose workflow terminate apple_watch_hr_client` diff --git a/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py b/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py index 9759e3c58d..020bdb4bc6 100644 --- a/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py +++ b/templates/live-heartrate-leaderboard/app/apis/get_leaderboard.py @@ -74,9 +74,8 @@ def run(client: MooseClient, params: LeaderboardQueryParams) -> LeaderboardRespo round(countIf(hr_value >= 160 AND hr_value < 180) / count() * 100, 1) as zone4_percentage, round(countIf(hr_value >= 180) / count() * 100, 1) as zone5_percentage FROM unified_hr_packet - WHERE hr_timestamp_seconds >= ( - SELECT MAX(hr_timestamp_seconds) - toInt32({time_window_seconds}) - FROM unified_hr_packet + WHERE processed_timestamp >= ( + toDateTime64(now(), 3) - toIntervalSecond(toInt32({time_window_seconds})) ) GROUP BY user_name ) diff --git a/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py b/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py index f4826fa60c..37beac15e7 100644 --- a/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py +++ b/templates/live-heartrate-leaderboard/app/apis/get_user_live_heart_rate_stats.py @@ -61,10 +61,8 @@ def run(client: MooseClient, params: QueryParams) -> HeartRateStats: (hr_timestamp_seconds - lagInFrame(hr_timestamp_seconds, 1, hr_timestamp_seconds - 1) OVER (PARTITION BY user_name ORDER BY hr_timestamp_seconds)))/60 as calories_burned FROM unified_hr_packet WHERE user_name = {user_name} - AND hr_timestamp_seconds >= ( - SELECT MAX(hr_timestamp_seconds) - {window_seconds} - FROM unified_hr_packet - WHERE user_name = {user_name} + AND processed_timestamp >= ( + toDateTime64(now(), 3) - toIntervalSecond(toInt32({window_seconds})) ) ) SELECT diff --git a/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py new file mode 100644 index 0000000000..3adf27eb4d --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/datamodels/AppleWatchHRPacket.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel +from moose_lib import Key +from typing import List + +class AppleWatchHRPacket(BaseModel): + device_id: Key[str] + heart_rate_data: int \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py index e5c6a01c26..c11710904a 100644 --- a/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py +++ b/templates/live-heartrate-leaderboard/app/datamodels/RawAntHRPacket.py @@ -1,7 +1,9 @@ from pydantic import BaseModel from moose_lib import Key +from typing import Optional class RawAntHRPacket(BaseModel): device_id: Key[int] packet_count: int - ant_hr_packet: list[int] \ No newline at end of file + ant_hr_packet: list[int] + timestamp: Optional[float] = None \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py b/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py index 81da1fb273..23e14b822f 100644 --- a/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py +++ b/templates/live-heartrate-leaderboard/app/datamodels/UnifiedHRPacket.py @@ -1,6 +1,7 @@ from moose_lib import Key from pydantic import BaseModel from datetime import datetime +from typing import Optional class UnifiedHRPacket(BaseModel): user_id: Key[int] @@ -8,7 +9,7 @@ class UnifiedHRPacket(BaseModel): device_id: int hr_timestamp_seconds: float hr_value: float - rr_interval_ms: float + rr_interval_ms: Optional[float] processed_timestamp: datetime # hr_max: float # hr_zone: int \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py new file mode 100644 index 0000000000..67baf6dd35 --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/functions/applewatch_to_unified_packet.py @@ -0,0 +1,48 @@ +from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket +from app.datamodels.UnifiedHRPacket import UnifiedHRPacket +from typing import Optional +from datetime import datetime, timezone +from pathlib import Path +import json +from moose_lib import Logger, Key + +def load_device_dict(): + json_path = Path(__file__).parents[2] / 'mock-user-db.json' + logger = Logger(action="SF") + # logger.info(f'Starting streaming function and loading mock user db from {json_path}') + + with open(json_path) as f: + device_dict = json.load(f) + # Filter to only include devices marked as live bluetooth devices + device_dict = {k: v for k, v in device_dict.items() if v.get('live_bt_device') == "True"} + + return device_dict + +all_bluetooth_device_dict = load_device_dict() + +logger = Logger(action="SF") +# logger.info(f'all_bluetooth_device_dict: {all_bluetooth_device_dict}') + +# Capture the moment this module is first imported as the streaming start time (UTC) +stream_start_time = datetime.now(timezone.utc) +# logger.info(f"Apple Watch streaming function start time: {stream_start_time.isoformat()}") + +def apple_watch_to_unified(source: AppleWatchHRPacket) -> UnifiedHRPacket: + device_id_str = str(source.device_id) + device_dict = all_bluetooth_device_dict[device_id_str] + logger.info(f"device_dict: {device_dict}") + user_name = device_dict.get('user_name') + user_id = device_dict.get('user_id') # This should already be an integer + + # Time elapsed (in seconds) since the streaming function/module was first loaded + elapsed_seconds = (datetime.now(timezone.utc) - stream_start_time).total_seconds() + + return UnifiedHRPacket( + user_id=user_id, # UnifiedHRPacket will handle the Key[int] conversion + user_name=user_name, + device_id=user_id, # Convert to plain int + hr_timestamp_seconds=elapsed_seconds, + hr_value=source.heart_rate_data, + rr_interval_ms=0, # not included in apple watch + processed_timestamp=datetime.now(timezone.utc), + ) diff --git a/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py index e177ae86a0..56fa0d84df 100644 --- a/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py +++ b/templates/live-heartrate-leaderboard/app/functions/bluetooth_to_unified_packet.py @@ -9,7 +9,7 @@ def load_device_dict(): json_path = Path(__file__).parents[2] / 'mock-user-db.json' logger = Logger(action="SF") - logger.info(f'Starting streaming function and loading mock user db from {json_path}') + # logger.info(f'Starting streaming function and loading mock user db from {json_path}') with open(json_path) as f: device_dict = json.load(f) diff --git a/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py b/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py index 4f54892eb1..ff30bd1f85 100644 --- a/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py +++ b/templates/live-heartrate-leaderboard/app/functions/processed_ant_to_unified_packet.py @@ -11,7 +11,7 @@ def load_device_dict(): json_path = Path(__file__).parents[2] / 'mock-user-db.json' logger = Logger(action="SF") - logger.info(f'Starting streaming function and loading mock user db from {json_path}') + # logger.info(f'Starting streaming function and loading mock user db from {json_path}') with open(json_path) as f: device_dict = json.load(f) diff --git a/templates/live-heartrate-leaderboard/app/main.py b/templates/live-heartrate-leaderboard/app/main.py index a9b35fe645..040c799d84 100644 --- a/templates/live-heartrate-leaderboard/app/main.py +++ b/templates/live-heartrate-leaderboard/app/main.py @@ -3,12 +3,9 @@ from moose_lib import StreamingFunction from moose_lib import ConsumptionApi -from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline -# Instatiated materialized views for in DB processing -from app.views.aggregated_per_second import aggregateHeartRateSummaryPerSecondMV +from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline, appleWatchHRPipeline -# Instantiate APIs from app.apis.get_leaderboard import get_leaderboard_api from app.apis.get_user_live_heart_rate_stats import get_user_live_heart_rate_stats diff --git a/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py b/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py index 8cb1fa39a5..432206607c 100644 --- a/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py +++ b/templates/live-heartrate-leaderboard/app/pipelines/pipelines.py @@ -5,8 +5,10 @@ from app.datamodels.ProcessedAntHRPacket import ProcessedAntHRPacket from app.datamodels.RawAntHRPacket import RawAntHRPacket from app.datamodels.BluetoothHRPacket import BluetoothHRPacket +from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket # Instantiate functions for kafka stream processing +from app.functions.applewatch_to_unified_packet import apple_watch_to_unified from app.functions.raw_ant_to_processed_ant_packet import RawAntHRPacket__ProcessedAntHRPacket from app.functions.processed_ant_to_unified_packet import processedAntHRPacket__UNIFIED_HR_PACKET from app.functions.bluetooth_to_unified_packet import bluetoothHRPacket__UNIFIED_HRM_MODEL @@ -37,6 +39,13 @@ stream=True, table=True )) +# Create an ingest pipeline for HR data streaming from iPhone Echo system & Apple Watch to System via Bluetooth + +appleWatchHRPipeline = IngestPipeline[AppleWatchHRPacket]("apple_health_watch_packet", IngestPipelineConfig( + ingest=True, + stream=True, + table=True +)) # Transform RawAntHRPacket to ProcessedAntHRPacket in stream rawAntHRPipeline.get_stream().add_transform( @@ -52,4 +61,10 @@ bluetoothHRPipeline.get_stream().add_transform( destination=unifiedHRPipeline.get_stream(), transformation=bluetoothHRPacket__UNIFIED_HRM_MODEL -) \ No newline at end of file +) + + +appleWatchHRPipeline.get_stream().add_transform( + destination=unifiedHRPipeline.get_stream(), + transformation=apple_watch_to_unified +) diff --git a/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py new file mode 100644 index 0000000000..1743b7b8c4 --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/apple_heart_rate_client.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +# Requirements: bleak, requests +import asyncio +from bleak import BleakScanner, BleakClient +import requests +from moose_lib import task, Logger + +HEART_RATE_CHAR_UUID = "00002a37-0000-1000-8000-00805f9b34fb" +INGEST_URL = "http://localhost:4000/ingest/apple_health_watch_packet" + +BLE_DEVICE_NAME = "iPhone" +DEBUG_BLE_MODE = False + +def parse_heart_rate(data): + # Standard BLE Heart Rate Measurement characteristic parsing + # See: https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/ + flag = data[0] + hr_format = flag & 0x01 + if hr_format == 0: + hr_value = data[1] + else: + hr_value = int.from_bytes(data[1:3], byteorder='little') + return hr_value + +@task +async def apple_watch_hr_client(): + logger = Logger("apple_watch_hr_client") + logger.info("Scanning for Apple Watch devices...") + devices_dict = await BleakScanner.discover(return_adv=True) + if not devices_dict: + logger.error("No BLE devices found.") + return { + "task": "apple_watch_hr_client", + "data": { + "device_id": None, + "error": "No iPhone found" + } + } + + device = None + for address, (ble_device, adv_data) in devices_dict.items(): + logger.info(f"Device name: {ble_device.name} ({address})") + if ble_device.name and BLE_DEVICE_NAME.lower().strip() in ble_device.name.lower().strip(): + logger.info(f"Found {BLE_DEVICE_NAME}: {ble_device.name} ({ble_device.address})") + device = ble_device + + if not device: + logger.error(f"No {BLE_DEVICE_NAME} found from BleakScanner") + return { + "task": "apple_watch_hr_client", + "data": { + "device_id": None, + "error": "No iPhone found" + } + } + + logger.info(f"Connecting to: {device.name} ({device.address})") + + async with BleakClient(device) as client: + logger.info(f"Connected: {client.is_connected}") + # Get all services + if DEBUG_BLE_MODE: + services = await client.get_services() + logger.info("Services:") + for service in services: + logger.info(f"Service: {service.uuid}") + for char in service.characteristics: + logger.info(f" Characteristic: {char.uuid}") + logger.info(f" Properties: {char.properties}") + logger.info(f" Handle: {char.handle}") + if "read" in char.properties: + try: + value = await client.read_gatt_char(char.uuid) + logger.info(f" Value: {value}") + except Exception as e: + logger.info(f" Could not read value: {e}") + + def hr_callback(sender, data): + try: + heart_rate = parse_heart_rate(data) + payload = { + "device_id": device.address, + "heart_rate_data": heart_rate + } + response = requests.post(INGEST_URL, json=payload) + if response.status_code == 200: + logger.info(f"Sent: {payload}") + else: + logger.error(f"Failed to send data: {response.status_code} {response.text}") + except Exception as e: + logger.error(f"Error in callback: {e}") + + logger.info(f"Subscribing to heart rate notifications on {HEART_RATE_CHAR_UUID}...") + try: + await client.start_notify(HEART_RATE_CHAR_UUID, hr_callback) + except Exception as e: + logger.error(f"Error subscribing to heart rate notifications: {e}") + return + logger.info("Subscribed! Waiting for data...") + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("Disconnecting...") + await client.stop_notify(HEART_RATE_CHAR_UUID) + logger.info("Disconnected.") + + return { + "task": "apple_watch_hr_client", + "data": { + "device_id": device.address + } + } \ No newline at end of file diff --git a/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml new file mode 100644 index 0000000000..daadd8d3a3 --- /dev/null +++ b/templates/live-heartrate-leaderboard/app/scripts/apple_watch_hr_client/config.toml @@ -0,0 +1,5 @@ +name = 'apple_watch_hr_client' +schedule = '' +retries = 3 +timeout = '1h' +tasks = ['apple_watch_hr_client'] diff --git a/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py b/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py index 7f45d01a2e..e777331877 100644 --- a/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py +++ b/templates/live-heartrate-leaderboard/app/scripts/generate_data/1.generate_mock_ant_hr_data.py @@ -78,10 +78,11 @@ def generate_mock_ant_hr_data(): json_data = json.dumps(ant_packet) try: - logger.info(f"Sending JSON: {json_data}") + # logger.info(f"Sending JSON: {json_data}") response = requests.post(url, data=json_data, headers=headers) if response.status_code == 200: - logger.info(f"Successfully sent packet: {ant_packet}") + # logger.info(f"Successfully sent packet: {ant_packet}") + pass else: print(f"Failed to send packet: {response.status_code}, {response.text}") except requests.exceptions.RequestException as e: diff --git a/templates/live-heartrate-leaderboard/app/streamlit_app.py b/templates/live-heartrate-leaderboard/app/streamlit_app.py index f762cee0f7..3338c72e84 100644 --- a/templates/live-heartrate-leaderboard/app/streamlit_app.py +++ b/templates/live-heartrate-leaderboard/app/streamlit_app.py @@ -5,6 +5,26 @@ import pandas as pd from datetime import datetime, timezone import numpy as np +import json + +# Load user database once at module level +_USER_DB_CACHE = None + +def load_user_db(): + """Load user database from JSON file and cache it.""" + global _USER_DB_CACHE + if _USER_DB_CACHE is None: + try: + with open("mock-user-db.json", "r") as f: + _USER_DB_CACHE = json.load(f) + except Exception as e: + # Use Streamlit's error display only if we are in a Streamlit context + try: + st.error(f"Failed to load user database: {str(e)}") + except Exception: + pass + _USER_DB_CACHE = {} + return _USER_DB_CACHE # Page config st.set_page_config( @@ -13,55 +33,6 @@ layout="wide" ) -# Modern animated banner -st.markdown(""" - - -
-""", unsafe_allow_html=True) - # Initialize session state and constants if 'selected_user' not in st.session_state: st.session_state.selected_user = None @@ -78,6 +49,29 @@ # Constants LEADERBOARD_TIME_WINDOW = 300 # 5 minutes in seconds +# Ensure a default selected user (top of leaderboard) when the app loads +def ensure_default_selected_user(): + if st.session_state.selected_user is None: + try: + resp = requests.get( + f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=1" + ) + if resp.status_code == 200 and resp.json().get("entries"): + st.session_state.selected_user = resp.json()["entries"][0]["user_name"] + except Exception: + pass + +ensure_default_selected_user() + +# Helper: fetch profile image for a given user name from cached user database +def get_profile_image(user_name: str): + """Return the profile image URL for the provided user name or None if not found.""" + user_db = load_user_db() + for user in user_db.values(): + if user.get("user_name") == user_name: + return user.get("profile_image") + return None + # Function to update the live graph def update_live_graph(): try: @@ -119,43 +113,71 @@ def update_live_graph(): # Function to update the leaderboard def update_leaderboard(): + """Return HTML table (with avatars) + row count.""" try: - response = requests.get(f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=10") + response = requests.get( + f"http://localhost:4000/consumption/getLeaderboard?time_window_seconds={LEADERBOARD_TIME_WINDOW}&limit=100" + ) if response.status_code == 200: data = response.json()["entries"] df = pd.DataFrame(data) - - # Display only relevant columns - display_cols = ['rank', 'user_name', 'avg_heart_rate', 'avg_power', 'total_calories'] + + # Relevant columns for display (include max stats) + display_cols = [ + "rank", + "user_name", + "avg_heart_rate", + "max_heart_rate", + "avg_power", + "max_power", + "total_calories", + ] df_display = df[display_cols].copy() - - # Add styling to highlight selected user + + # Round numeric columns to 1 decimal place (excluding rank) + numeric_cols = [ + "avg_heart_rate", + "max_heart_rate", + "avg_power", + "max_power", + "total_calories", + ] + df_display[numeric_cols] = df_display[numeric_cols].round(1) + + # Inject avatar HTML in a new first column and re-order + df_display.insert( + 0, + "avatar", + df_display["user_name"].apply( + lambda u: f"