Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion templates/live-heartrate-leaderboard/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ wheels
*.egg-info/
.installed.cfg
*.egg
MANIFEST
MANIFEST
.cursor
16 changes: 13 additions & 3 deletions templates/live-heartrate-leaderboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source .venv/bin/activate```
4. Run Moose
```moose dev```

5. Start workflows
5. Generate mock ANT+ Data
In another terminal run:
```moose workflow run generate_data```

Expand All @@ -26,6 +26,16 @@ In another terminal run:
In another terminal:
```streamlit run app/streamlit_app.py```

## SOS
## Additional - Connect your own Heart Rate Device (Apple Watch)

```docker stop $(docker ps -aq)```
Connect your own Apple Watch to the system. This has only been tested on the AppleWatch SE! Please feel free to add your device, following the example in `app/scripts/apple_watch_hr_client`

1. Download the Echo App on your IPhone/Android device since the Apple Watch doesn't nativley support broadcasting HR data over BLE
2. Update the name in the code to match your device name (or a substring) `app/scripts/apple_watch_hr_client/apple_heart_rate_client`
3. Update the "user table" in the `mock-user-db.json` to match your details.
4. Run your `moose dev` server and then run `moose workflow run apple_watch_hr_client`


To terminate the (temporal) workflow, run:

`moose workflow terminate apple_watch_hr_client`
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ def run(client: MooseClient, params: LeaderboardQueryParams) -> LeaderboardRespo
round(countIf(hr_value >= 160 AND hr_value < 180) / count() * 100, 1) as zone4_percentage,
round(countIf(hr_value >= 180) / count() * 100, 1) as zone5_percentage
FROM unified_hr_packet
WHERE hr_timestamp_seconds >= (
SELECT MAX(hr_timestamp_seconds) - toInt32({time_window_seconds})
FROM unified_hr_packet
WHERE processed_timestamp >= (
toDateTime64(now(), 3) - toIntervalSecond(toInt32({time_window_seconds}))
)
GROUP BY user_name
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ def run(client: MooseClient, params: QueryParams) -> HeartRateStats:
(hr_timestamp_seconds - lagInFrame(hr_timestamp_seconds, 1, hr_timestamp_seconds - 1) OVER (PARTITION BY user_name ORDER BY hr_timestamp_seconds)))/60 as calories_burned
FROM unified_hr_packet
WHERE user_name = {user_name}
AND hr_timestamp_seconds >= (
SELECT MAX(hr_timestamp_seconds) - {window_seconds}
FROM unified_hr_packet
WHERE user_name = {user_name}
AND processed_timestamp >= (
toDateTime64(now(), 3) - toIntervalSecond(toInt32({window_seconds}))
)
)
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic import BaseModel
from moose_lib import Key
from typing import List

class AppleWatchHRPacket(BaseModel):
device_id: Key[str]
heart_rate_data: int
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from pydantic import BaseModel
from moose_lib import Key
from typing import Optional

class RawAntHRPacket(BaseModel):
device_id: Key[int]
packet_count: int
ant_hr_packet: list[int]
ant_hr_packet: list[int]
timestamp: Optional[float] = None
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from moose_lib import Key
from pydantic import BaseModel
from datetime import datetime
from typing import Optional

class UnifiedHRPacket(BaseModel):
user_id: Key[int]
user_name: str
device_id: int
hr_timestamp_seconds: float
hr_value: float
rr_interval_ms: float
rr_interval_ms: Optional[float]
processed_timestamp: datetime
# hr_max: float
# hr_zone: int
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket
from app.datamodels.UnifiedHRPacket import UnifiedHRPacket
from typing import Optional
from datetime import datetime, timezone
from pathlib import Path
import json
from moose_lib import Logger, Key

def load_device_dict():
json_path = Path(__file__).parents[2] / 'mock-user-db.json'
logger = Logger(action="SF")
# logger.info(f'Starting streaming function and loading mock user db from {json_path}')

with open(json_path) as f:
device_dict = json.load(f)
# Filter to only include devices marked as live bluetooth devices
device_dict = {k: v for k, v in device_dict.items() if v.get('live_bt_device') == "True"}

return device_dict

all_bluetooth_device_dict = load_device_dict()

logger = Logger(action="SF")
# logger.info(f'all_bluetooth_device_dict: {all_bluetooth_device_dict}')

# Capture the moment this module is first imported as the streaming start time (UTC)
stream_start_time = datetime.now(timezone.utc)
# logger.info(f"Apple Watch streaming function start time: {stream_start_time.isoformat()}")

def apple_watch_to_unified(source: AppleWatchHRPacket) -> UnifiedHRPacket:
device_id_str = str(source.device_id)
device_dict = all_bluetooth_device_dict[device_id_str]
logger.info(f"device_dict: {device_dict}")
user_name = device_dict.get('user_name')
user_id = device_dict.get('user_id') # This should already be an integer

# Time elapsed (in seconds) since the streaming function/module was first loaded
elapsed_seconds = (datetime.now(timezone.utc) - stream_start_time).total_seconds()

return UnifiedHRPacket(
user_id=user_id, # UnifiedHRPacket will handle the Key[int] conversion
user_name=user_name,
device_id=user_id, # Convert to plain int
hr_timestamp_seconds=elapsed_seconds,
hr_value=source.heart_rate_data,
rr_interval_ms=0, # not included in apple watch
processed_timestamp=datetime.now(timezone.utc),
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
def load_device_dict():
json_path = Path(__file__).parents[2] / 'mock-user-db.json'
logger = Logger(action="SF")
logger.info(f'Starting streaming function and loading mock user db from {json_path}')
# logger.info(f'Starting streaming function and loading mock user db from {json_path}')

with open(json_path) as f:
device_dict = json.load(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def load_device_dict():
json_path = Path(__file__).parents[2] / 'mock-user-db.json'
logger = Logger(action="SF")
logger.info(f'Starting streaming function and loading mock user db from {json_path}')
# logger.info(f'Starting streaming function and loading mock user db from {json_path}')

with open(json_path) as f:
device_dict = json.load(f)
Expand Down
5 changes: 1 addition & 4 deletions templates/live-heartrate-leaderboard/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
from moose_lib import StreamingFunction
from moose_lib import ConsumptionApi

from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline

# Instatiated materialized views for in DB processing
from app.views.aggregated_per_second import aggregateHeartRateSummaryPerSecondMV
from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline, appleWatchHRPipeline

# Instantiate APIs
from app.apis.get_leaderboard import get_leaderboard_api
from app.apis.get_user_live_heart_rate_stats import get_user_live_heart_rate_stats

17 changes: 16 additions & 1 deletion templates/live-heartrate-leaderboard/app/pipelines/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from app.datamodels.ProcessedAntHRPacket import ProcessedAntHRPacket
from app.datamodels.RawAntHRPacket import RawAntHRPacket
from app.datamodels.BluetoothHRPacket import BluetoothHRPacket
from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket

# Instantiate functions for kafka stream processing
from app.functions.applewatch_to_unified_packet import apple_watch_to_unified
from app.functions.raw_ant_to_processed_ant_packet import RawAntHRPacket__ProcessedAntHRPacket
from app.functions.processed_ant_to_unified_packet import processedAntHRPacket__UNIFIED_HR_PACKET
from app.functions.bluetooth_to_unified_packet import bluetoothHRPacket__UNIFIED_HRM_MODEL
Expand Down Expand Up @@ -37,6 +39,13 @@
stream=True,
table=True
))
# Create an ingest pipeline for HR data streaming from iPhone Echo system & Apple Watch to System via Bluetooth

appleWatchHRPipeline = IngestPipeline[AppleWatchHRPacket]("apple_health_watch_packet", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))

# Transform RawAntHRPacket to ProcessedAntHRPacket in stream
rawAntHRPipeline.get_stream().add_transform(
Expand All @@ -52,4 +61,10 @@
bluetoothHRPipeline.get_stream().add_transform(
destination=unifiedHRPipeline.get_stream(),
transformation=bluetoothHRPacket__UNIFIED_HRM_MODEL
)
)


appleWatchHRPipeline.get_stream().add_transform(
destination=unifiedHRPipeline.get_stream(),
transformation=apple_watch_to_unified
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
# Requirements: bleak, requests
import asyncio
from bleak import BleakScanner, BleakClient
import requests
from moose_lib import task, Logger

HEART_RATE_CHAR_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
INGEST_URL = "http://localhost:4000/ingest/apple_health_watch_packet"

BLE_DEVICE_NAME = "iPhone"
DEBUG_BLE_MODE = False

def parse_heart_rate(data):
# Standard BLE Heart Rate Measurement characteristic parsing
# See: https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/
flag = data[0]
hr_format = flag & 0x01
if hr_format == 0:
hr_value = data[1]
else:
hr_value = int.from_bytes(data[1:3], byteorder='little')
return hr_value

@task
async def apple_watch_hr_client():
logger = Logger("apple_watch_hr_client")
logger.info("Scanning for Apple Watch devices...")
devices_dict = await BleakScanner.discover(return_adv=True)
if not devices_dict:
logger.error("No BLE devices found.")
return {
"task": "apple_watch_hr_client",
"data": {
"device_id": None,
"error": "No iPhone found"
}
}

device = None
for address, (ble_device, adv_data) in devices_dict.items():
logger.info(f"Device name: {ble_device.name} ({address})")
if ble_device.name and BLE_DEVICE_NAME.lower().strip() in ble_device.name.lower().strip():
logger.info(f"Found {BLE_DEVICE_NAME}: {ble_device.name} ({ble_device.address})")
device = ble_device

if not device:
logger.error(f"No {BLE_DEVICE_NAME} found from BleakScanner")
return {
"task": "apple_watch_hr_client",
"data": {
"device_id": None,
"error": "No iPhone found"
}
}

logger.info(f"Connecting to: {device.name} ({device.address})")

async with BleakClient(device) as client:
logger.info(f"Connected: {client.is_connected}")
# Get all services
if DEBUG_BLE_MODE:
services = await client.get_services()
logger.info("Services:")
for service in services:
logger.info(f"Service: {service.uuid}")
for char in service.characteristics:
logger.info(f" Characteristic: {char.uuid}")
logger.info(f" Properties: {char.properties}")
logger.info(f" Handle: {char.handle}")
if "read" in char.properties:
try:
value = await client.read_gatt_char(char.uuid)
logger.info(f" Value: {value}")
except Exception as e:
logger.info(f" Could not read value: {e}")

def hr_callback(sender, data):
try:
heart_rate = parse_heart_rate(data)
payload = {
"device_id": device.address,
"heart_rate_data": heart_rate
}
response = requests.post(INGEST_URL, json=payload)
if response.status_code == 200:
logger.info(f"Sent: {payload}")
else:
logger.error(f"Failed to send data: {response.status_code} {response.text}")
except Exception as e:
logger.error(f"Error in callback: {e}")

logger.info(f"Subscribing to heart rate notifications on {HEART_RATE_CHAR_UUID}...")
try:
await client.start_notify(HEART_RATE_CHAR_UUID, hr_callback)
except Exception as e:
logger.error(f"Error subscribing to heart rate notifications: {e}")
return
logger.info("Subscribed! Waiting for data...")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Disconnecting...")
await client.stop_notify(HEART_RATE_CHAR_UUID)
logger.info("Disconnected.")

return {
"task": "apple_watch_hr_client",
"data": {
"device_id": device.address
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name = 'apple_watch_hr_client'
schedule = ''
retries = 3
timeout = '1h'
tasks = ['apple_watch_hr_client']
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ def generate_mock_ant_hr_data():
json_data = json.dumps(ant_packet)

try:
logger.info(f"Sending JSON: {json_data}")
# logger.info(f"Sending JSON: {json_data}")
response = requests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
logger.info(f"Successfully sent packet: {ant_packet}")
# logger.info(f"Successfully sent packet: {ant_packet}")
pass
else:
print(f"Failed to send packet: {response.status_code}, {response.text}")
except requests.exceptions.RequestException as e:
Expand Down
Loading
Loading