|
1 |
| -from pathlib import Path |
2 |
| - |
3 |
| -import os |
| 1 | +import re |
| 2 | +import boto3 |
4 | 3 | import pandas as pd
|
5 | 4 | import streamlit as st
|
6 | 5 |
|
| 6 | +S3_BUCKET = "caltrans-pems-prd-us-west-2-marts" |
| 7 | +STATIONS_METADATA_KEY = "geo/current_stations.parquet" |
| 8 | +DATA_PREFIX = "imputation/detector_imputed_agg_five_minutes" |
| 9 | + |
| 10 | + |
| 11 | +@st.cache_data(ttl=3600) # Cache for 1 hour |
| 12 | +def load_station_metadata(district_number: str) -> pd.DataFrame: |
| 13 | + """Loads metadata for all stations in the selected District from S3.""" |
| 14 | + |
| 15 | + filters = [("DISTRICT", "=", district_number)] |
| 16 | + |
| 17 | + return pd.read_parquet( |
| 18 | + f"s3://{S3_BUCKET}/{STATIONS_METADATA_KEY}", |
| 19 | + columns=[ |
| 20 | + "STATION_ID", |
| 21 | + "NAME", |
| 22 | + "PHYSICAL_LANES", |
| 23 | + "STATE_POSTMILE", |
| 24 | + "ABSOLUTE_POSTMILE", |
| 25 | + "LATITUDE", |
| 26 | + "LONGITUDE", |
| 27 | + "LENGTH", |
| 28 | + "STATION_TYPE", |
| 29 | + "DISTRICT", |
| 30 | + "FREEWAY", |
| 31 | + "DIRECTION", |
| 32 | + "COUNTY_NAME", |
| 33 | + "CITY_NAME", |
| 34 | + ], |
| 35 | + filters=filters, |
| 36 | + ) |
7 | 37 |
|
8 |
| -APP_DIR = Path(__file__).parent |
9 | 38 |
|
| 39 | +@st.cache_data(ttl=3600) # Cache for 1 hour |
| 40 | +def get_available_days() -> set: |
| 41 | + """ |
| 42 | + Lists available days by inspecting S3 prefixes. |
| 43 | + """ |
10 | 44 |
|
11 |
| -@st.cache_data() |
12 |
| -def fetch_data(): |
13 |
| - all_files = os.listdir(APP_DIR) |
14 |
| - # Get station metadata files |
15 |
| - station_files = [f for f in all_files if "_text_meta_" in f and f.endswith(".txt")] |
| 45 | + s3 = boto3.client("s3") |
| 46 | + s3_keys = s3.list_objects(Bucket=S3_BUCKET, Prefix=DATA_PREFIX) |
16 | 47 |
|
17 |
| - df_list = [] |
18 |
| - for f in station_files: |
19 |
| - file_path = os.path.join(APP_DIR, f) |
20 |
| - df = pd.read_csv(file_path, delimiter="\t") |
21 |
| - df_list.append(df) |
22 |
| - # Combine into a single DataFrame |
23 |
| - df_all = pd.concat(df_list, ignore_index=True) |
| 48 | + days = set() |
24 | 49 |
|
25 |
| - data = df_all.dropna(subset=["Latitude", "Longitude"]) |
26 |
| - data.loc[:, "State_PM"] = data["State_PM"].astype(str) |
27 |
| - data.loc[:, "User_ID_1"] = data["User_ID_1"].astype(str) |
28 |
| - return data |
| 50 | + for item in s3_keys["Contents"]: |
| 51 | + s3_path = item["Key"] |
| 52 | + # Find "day=", then capture one or more digits that immediately follow it |
| 53 | + match = re.search(r"day=(\d+)", s3_path) |
| 54 | + if match: |
| 55 | + # add as int only the text captured by the first set of parentheses to the set |
| 56 | + days.add(int(match.group(1))) |
29 | 57 |
|
| 58 | + return sorted(days) |
30 | 59 |
|
31 |
| -query_params = st.query_params |
32 |
| -district_number = query_params.get("district_number", "") |
33 |
| -district_number = int(district_number) if district_number else district_number # Ensure district_number is an integer |
34 | 60 |
|
35 |
| -st.set_page_config(layout="wide") |
| 61 | +def load_station_data(station_id: str) -> pd.DataFrame: |
| 62 | + """ |
| 63 | + Loads station data for a specific station. |
| 64 | + """ |
36 | 65 |
|
37 |
| -df = fetch_data() |
38 |
| -if district_number: |
39 |
| - # filter to just the current district |
40 |
| - df = df[df["District"] == district_number] |
41 |
| - st.title(f"District {district_number} Station Viewer") |
42 |
| -else: |
43 |
| - st.title("Districts Station Viewer") |
| 66 | + filters = [("STATION_ID", "=", station_id)] |
44 | 67 |
|
45 |
| -left_col, center_col, right_col = st.columns([1, 2, 2]) |
| 68 | + return pd.read_parquet( |
| 69 | + f"s3://{S3_BUCKET}/{DATA_PREFIX}", |
| 70 | + columns=[ |
| 71 | + "STATION_ID", |
| 72 | + "LANE", |
| 73 | + "SAMPLE_TIMESTAMP", |
| 74 | + "VOLUME_SUM", |
| 75 | + "SPEED_FIVE_MINS", |
| 76 | + "OCCUPANCY_AVG", |
| 77 | + ], |
| 78 | + filters=filters, |
| 79 | + ) |
46 | 80 |
|
47 |
| -with left_col: |
48 |
| - # Create filters |
49 |
| - id_options = ["All"] + sorted(df["ID"].dropna().unique().tolist()) |
50 |
| - selected_id = st.selectbox("Select Station", id_options) |
51 | 81 |
|
52 |
| - fwy_options = ["All"] + sorted(df["Fwy"].dropna().unique().tolist()) |
53 |
| - selected_fwy = st.selectbox("Select Freeway", fwy_options) |
| 82 | +# --- STREAMLIT APP --- |
54 | 83 |
|
55 |
| - dir_options = ["All"] + sorted(df["Dir"].dropna().unique().tolist()) |
56 |
| - selected_dir = st.selectbox("Select Direction", dir_options) |
57 | 84 |
|
58 |
| - type_options = ["All"] + sorted(df["Type"].dropna().unique().tolist()) |
59 |
| - selected_type = st.selectbox("Select Type", type_options) |
| 85 | +def main(): |
| 86 | + query_params = st.query_params |
| 87 | + district_number = query_params.get("district_number", "") |
60 | 88 |
|
61 |
| -# Apply filters |
62 |
| -filtered_df = df.copy() |
| 89 | + df_station_metadata = load_station_metadata(district_number) |
| 90 | + st.dataframe(df_station_metadata, use_container_width=True) |
63 | 91 |
|
64 |
| -if selected_id != "All": |
65 |
| - filtered_df = filtered_df[filtered_df["ID"] == selected_id] |
| 92 | + station = st.selectbox( |
| 93 | + "Station", |
| 94 | + df_station_metadata["STATION_ID"], |
| 95 | + ) |
66 | 96 |
|
67 |
| -if selected_fwy != "All": |
68 |
| - filtered_df = filtered_df[filtered_df["Fwy"] == selected_fwy] |
| 97 | + days = st.multiselect("Days", get_available_days()) |
69 | 98 |
|
70 |
| -if selected_dir != "All": |
71 |
| - filtered_df = filtered_df[filtered_df["Dir"] == selected_dir] |
| 99 | + station_data_button = st.button("Load Station Data", type="primary") |
72 | 100 |
|
73 |
| -if selected_type != "All": |
74 |
| - filtered_df = filtered_df[filtered_df["Type"] == selected_type] |
| 101 | + if station_data_button: |
| 102 | + df_station_data = load_station_data(station) |
| 103 | + st.dataframe(df_station_data, use_container_width=True) |
75 | 104 |
|
76 |
| -with center_col: |
77 |
| - # Show filtered data |
78 |
| - st.write(f"**Stations:** {filtered_df.shape[0]:,.0f}") |
79 |
| - st.write(f"**Directional distance:** {filtered_df["Length"].sum():,.1f} mi") |
80 |
| - st.dataframe(filtered_df, use_container_width=True) |
81 | 105 |
|
82 |
| -with right_col: |
83 |
| - # Rename columns to match Streamlit's expected format |
84 |
| - map_df = filtered_df.rename(columns={"Latitude": "latitude", "Longitude": "longitude"}) |
85 |
| - st.map(map_df[["latitude", "longitude"]]) |
| 106 | +if __name__ == "__main__": |
| 107 | + main() |
0 commit comments