Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion templates/live-heartrate-leaderboard/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ wheels
*.egg-info/
.installed.cfg
*.egg
MANIFEST
MANIFEST
.cursor
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ def run(client: MooseClient, params: LeaderboardQueryParams) -> LeaderboardRespo
round(countIf(hr_value >= 160 AND hr_value < 180) / count() * 100, 1) as zone4_percentage,
round(countIf(hr_value >= 180) / count() * 100, 1) as zone5_percentage
FROM unified_hr_packet
WHERE hr_timestamp_seconds >= (
SELECT MAX(hr_timestamp_seconds) - toInt32({time_window_seconds})
FROM unified_hr_packet
WHERE processed_timestamp >= (
toDateTime64(now(), 3) - toIntervalSecond(toInt32({time_window_seconds}))
)
GROUP BY user_name
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ def run(client: MooseClient, params: QueryParams) -> HeartRateStats:
(hr_timestamp_seconds - lagInFrame(hr_timestamp_seconds, 1, hr_timestamp_seconds - 1) OVER (PARTITION BY user_name ORDER BY hr_timestamp_seconds)))/60 as calories_burned
FROM unified_hr_packet
WHERE user_name = {user_name}
AND hr_timestamp_seconds >= (
SELECT MAX(hr_timestamp_seconds) - {window_seconds}
FROM unified_hr_packet
WHERE user_name = {user_name}
AND processed_timestamp >= (
toDateTime64(now(), 3) - toIntervalSecond(toInt32({window_seconds}))
)
)
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic import BaseModel
from moose_lib import Key
from typing import List

class AppleWatchHRPacket(BaseModel):
device_id: Key[str]
heart_rate_data: int
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from pydantic import BaseModel
from moose_lib import Key
from typing import Optional

class RawAntHRPacket(BaseModel):
device_id: Key[int]
packet_count: int
ant_hr_packet: list[int]
ant_hr_packet: list[int]
timestamp: Optional[float] = None
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from moose_lib import Key
from pydantic import BaseModel
from datetime import datetime
from typing import Optional

class UnifiedHRPacket(BaseModel):
user_id: Key[int]
user_name: str
device_id: int
hr_timestamp_seconds: float
hr_value: float
rr_interval_ms: float
rr_interval_ms: Optional[float]
processed_timestamp: datetime
# hr_max: float
# hr_zone: int
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket
from app.datamodels.UnifiedHRPacket import UnifiedHRPacket
from typing import Optional
from datetime import datetime, timezone
from pathlib import Path
import json
from moose_lib import Logger, Key

def load_device_dict():
json_path = Path(__file__).parents[2] / 'mock-user-db.json'
logger = Logger(action="SF")
# logger.info(f'Starting streaming function and loading mock user db from {json_path}')

with open(json_path) as f:
device_dict = json.load(f)
# Filter to only include devices marked as live bluetooth devices
device_dict = {k: v for k, v in device_dict.items() if v.get('live_bt_device') == "True"}

return device_dict

all_bluetooth_device_dict = load_device_dict()

logger = Logger(action="SF")
# logger.info(f'all_bluetooth_device_dict: {all_bluetooth_device_dict}')

# Capture the moment this module is first imported as the streaming start time (UTC)
stream_start_time = datetime.now(timezone.utc)
# logger.info(f"Apple Watch streaming function start time: {stream_start_time.isoformat()}")

def apple_watch_to_unified(source: AppleWatchHRPacket) -> UnifiedHRPacket:
device_id_str = str(source.device_id)
device_dict = all_bluetooth_device_dict[device_id_str]
logger.info(f"device_dict: {device_dict}")
user_name = device_dict.get('user_name')
user_id = device_dict.get('user_id') # This should already be an integer

# Time elapsed (in seconds) since the streaming function/module was first loaded
elapsed_seconds = (datetime.now(timezone.utc) - stream_start_time).total_seconds()

return UnifiedHRPacket(
user_id=user_id, # UnifiedHRPacket will handle the Key[int] conversion
user_name=user_name,
device_id=user_id, # Convert to plain int
hr_timestamp_seconds=elapsed_seconds,
hr_value=source.heart_rate_data,
rr_interval_ms=0, # not included in apple watch
processed_timestamp=datetime.now(timezone.utc),
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
def load_device_dict():
json_path = Path(__file__).parents[2] / 'mock-user-db.json'
logger = Logger(action="SF")
logger.info(f'Starting streaming function and loading mock user db from {json_path}')
# logger.info(f'Starting streaming function and loading mock user db from {json_path}')

with open(json_path) as f:
device_dict = json.load(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def load_device_dict():
json_path = Path(__file__).parents[2] / 'mock-user-db.json'
logger = Logger(action="SF")
logger.info(f'Starting streaming function and loading mock user db from {json_path}')
# logger.info(f'Starting streaming function and loading mock user db from {json_path}')

with open(json_path) as f:
device_dict = json.load(f)
Expand Down
5 changes: 1 addition & 4 deletions templates/live-heartrate-leaderboard/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
from moose_lib import StreamingFunction
from moose_lib import ConsumptionApi

from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline

# Instatiated materialized views for in DB processing
from app.views.aggregated_per_second import aggregateHeartRateSummaryPerSecondMV
from app.pipelines.pipelines import rawAntHRPipeline, processedAntHRPipeline, unifiedHRPipeline, bluetoothHRPipeline, appleWatchHRPipeline

# Instantiate APIs
from app.apis.get_leaderboard import get_leaderboard_api
from app.apis.get_user_live_heart_rate_stats import get_user_live_heart_rate_stats

17 changes: 16 additions & 1 deletion templates/live-heartrate-leaderboard/app/pipelines/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from app.datamodels.ProcessedAntHRPacket import ProcessedAntHRPacket
from app.datamodels.RawAntHRPacket import RawAntHRPacket
from app.datamodels.BluetoothHRPacket import BluetoothHRPacket
from app.datamodels.AppleWatchHRPacket import AppleWatchHRPacket

# Instantiate functions for kafka stream processing
from app.functions.applewatch_to_unified_packet import apple_watch_to_unified
from app.functions.raw_ant_to_processed_ant_packet import RawAntHRPacket__ProcessedAntHRPacket
from app.functions.processed_ant_to_unified_packet import processedAntHRPacket__UNIFIED_HR_PACKET
from app.functions.bluetooth_to_unified_packet import bluetoothHRPacket__UNIFIED_HRM_MODEL
Expand Down Expand Up @@ -37,6 +39,13 @@
stream=True,
table=True
))
# Create a ingest pipeline for Appple Health
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: correct "Appple" to "Apple".

Suggested change
# Create a ingest pipeline for Appple Health
# Create an ingest pipeline for Apple Health

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch


appleWatchHRPipeline = IngestPipeline[AppleWatchHRPacket]("apple_health_watch_packet", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))

# Transform RawAntHRPacket to ProcessedAntHRPacket in stream
rawAntHRPipeline.get_stream().add_transform(
Expand All @@ -52,4 +61,10 @@
bluetoothHRPipeline.get_stream().add_transform(
destination=unifiedHRPipeline.get_stream(),
transformation=bluetoothHRPacket__UNIFIED_HRM_MODEL
)
)


appleWatchHRPipeline.get_stream().add_transform(
destination=unifiedHRPipeline.get_stream(),
transformation=apple_watch_to_unified
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3
# Requirements: bleak, requests
import asyncio
from bleak import BleakScanner, BleakClient
import requests
from moose_lib import task, Logger

HEART_RATE_CHAR_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
INGEST_URL = "http://localhost:4000/ingest/apple_health_watch_packet"

def parse_heart_rate(data):
# Standard BLE Heart Rate Measurement characteristic parsing
# See: https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/
flag = data[0]
hr_format = flag & 0x01
if hr_format == 0:
hr_value = data[1]
else:
hr_value = int.from_bytes(data[1:3], byteorder='little')
return hr_value

@task
async def apple_watch_hr_client():
logger = Logger("apple_watch_hr_client")
logger.info("Scanning for Apple Watch devices...")
devices = await BleakScanner.discover()
if not devices:
logger.error("No BLE devices found.")
return

device = None
for d in devices:
if d.name and "iphone" in d.name.lower():
device = d
break

if not device:
logger.error("No iPhone found. Available devices:")
for d in devices:
logger.info(f" {d.name} ({d.address})")
return

logger.info(f"Connecting to: {device.name} ({device.address})")

async with BleakClient(device) as client:
logger.info(f"Connected: {client.is_connected}")

def hr_callback(sender, data):
try:
heart_rate = parse_heart_rate(data)
payload = {
"device_id": device.address,
"heart_rate_data": heart_rate
}
response = requests.post(INGEST_URL, json=payload)
if response.status_code == 200:
logger.info(f"Sent: {payload}")
else:
logger.error(f"Failed to send data: {response.status_code} {response.text}")
except Exception as e:
logger.error(f"Error in callback: {e}")

logger.info(f"Subscribing to heart rate notifications on {HEART_RATE_CHAR_UUID}...")
try:
await client.start_notify(HEART_RATE_CHAR_UUID, hr_callback)
except Exception as e:
logger.error(f"Error subscribing to heart rate notifications: {e}")
return
logger.info("Subscribed! Waiting for data...")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Disconnecting...")
await client.stop_notify(HEART_RATE_CHAR_UUID)
logger.info("Disconnected.")

return {
"task": "apple_watch_hr_client",
"data": {
"device_id": device.address
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name = 'apple_watch_hr_client'
schedule = ''
retries = 3
timeout = '1h'
tasks = ['apple_watch_hr_client']
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ def generate_mock_ant_hr_data():
json_data = json.dumps(ant_packet)

try:
logger.info(f"Sending JSON: {json_data}")
# logger.info(f"Sending JSON: {json_data}")
response = requests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
logger.info(f"Successfully sent packet: {ant_packet}")
# logger.info(f"Successfully sent packet: {ant_packet}")
pass
else:
print(f"Failed to send packet: {response.status_code}, {response.text}")
except requests.exceptions.RequestException as e:
Expand Down
Loading
Loading