Skip to content

Commit 096f6ef

Browse files
authored
materialize 1 week's worth of stop time update metrics (#4104)
* materialize 1 week's worth of stop time update metrics * use view and filter, point at reference not prod
1 parent b095528 commit 096f6ef

File tree

4 files changed

+296
-0
lines changed

4 files changed

+296
-0
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
{{
2+
config(
3+
materialized='incremental',
4+
incremental_strategy='insert_overwrite',
5+
partition_by={
6+
'field': 'service_date',
7+
'data_type': 'date',
8+
'granularity': 'day'
9+
}, cluster_by=['service_date', 'base64_url']
10+
)
11+
}}
12+
13+
14+
WITH fct_stop_time_updates AS (
15+
SELECT * FROM {{ ref('fct_stop_time_updates_week') }}
16+
-- TODO: these have duplicate rows down to the stop level, maybe should exclude
17+
WHERE gtfs_dataset_name NOT IN (
18+
'Bay Area 511 Regional TripUpdates',
19+
'BART TripUpdates',
20+
'Bay Area 511 Muni TripUpdates',
21+
'Unitrans Trip Updates'
22+
) AND service_date >= '2025-06-22' AND service_date <= '2025-06-28'
23+
),
24+
25+
stop_arrivals AS (
26+
SELECT DISTINCT
27+
gtfs_dataset_key,
28+
gtfs_dataset_name,
29+
base64_url,
30+
schedule_base64_url,
31+
service_date,
32+
trip_id,
33+
stop_id,
34+
stop_sequence,
35+
trip_start_date,
36+
trip_start_time,
37+
trip_direction_id,
38+
trip_route_id,
39+
trip_schedule_relationship,
40+
41+
-- last arrival and departure as UTC
42+
DATETIME(TIMESTAMP_SECONDS(LAST_VALUE(arrival_time IGNORE NULLS) OVER(PARTITION BY base64_url, service_date, trip_id, trip_start_date, trip_start_time, stop_id, stop_sequence ORDER BY COALESCE(trip_update_timestamp, header_timestamp) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))) AS last_trip_updates_arrival,
43+
DATETIME(TIMESTAMP_SECONDS(LAST_VALUE(departure_time IGNORE NULLS) OVER(PARTITION BY base64_url, service_date, trip_id, trip_start_date, trip_start_time, stop_id, stop_sequence ORDER BY COALESCE(trip_update_timestamp, header_timestamp) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))) AS last_trip_updates_departure,
44+
-- last arrival and departure as Pacific
45+
DATETIME(TIMESTAMP_SECONDS(LAST_VALUE(arrival_time IGNORE NULLS) OVER(PARTITION BY base64_url, service_date, trip_id, trip_start_date, trip_start_time, stop_id, stop_sequence ORDER BY COALESCE(trip_update_timestamp, header_timestamp) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)), "America/Los_Angeles") AS last_trip_updates_arrival_pacific,
46+
DATETIME(TIMESTAMP_SECONDS(LAST_VALUE(departure_time IGNORE NULLS) OVER(PARTITION BY base64_url, service_date, trip_id, trip_start_date, trip_start_time, stop_id, stop_sequence ORDER BY COALESCE(trip_update_timestamp, header_timestamp) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)), "America/Los_Angeles") AS last_trip_updates_departure_pacific,
47+
48+
FROM fct_stop_time_updates
49+
),
50+
51+
fct_stop_time_arrivals AS (
52+
SELECT
53+
stop_arrivals.*,
54+
-- usually one of these columns is null, but we want to use it to compare against _extract_ts
55+
COALESCE(last_trip_updates_arrival_pacific, last_trip_updates_departure_pacific) AS actual_arrival_pacific,
56+
COALESCE(last_trip_updates_arrival, last_trip_updates_departure) AS actual_arrival,
57+
FROM stop_arrivals
58+
)
59+
60+
SELECT * FROM fct_stop_time_arrivals
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
{{
2+
config(
3+
materialized='table',
4+
partition_by={
5+
'field': 'service_date',
6+
'data_type': 'date',
7+
'granularity': 'day'
8+
}, cluster_by=['service_date', 'base64_url']
9+
)
10+
}}
11+
12+
13+
WITH fct_stop_time_updates AS (
14+
SELECT *
15+
FROM {{ ref('fct_stop_time_updates_with_arrivals_week') }}
16+
WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28'
17+
),
18+
19+
fct_tu_summaries AS (
20+
SELECT DISTINCT
21+
trip_instance_key,
22+
service_date,
23+
base64_url,
24+
schedule_base64_url,
25+
trip_id
26+
FROM {{ ref('fct_trip_updates_summaries') }}
27+
WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28'
28+
),
29+
30+
prediction_difference AS (
31+
SELECT
32+
base64_url,
33+
service_date,
34+
trip_id,
35+
stop_id,
36+
stop_sequence,
37+
DATETIME(_extract_ts) AS _extract_ts,
38+
arrival_time,
39+
actual_arrival,
40+
extract_hour,
41+
extract_minute,
42+
DATETIME_DIFF(actual_arrival, arrival_time, SECOND) AS prediction_seconds_difference,
43+
DATETIME_DIFF(actual_arrival, DATETIME(_extract_ts), MINUTE) as minutes_until_arrival,
44+
FROM fct_stop_time_updates
45+
WHERE DATETIME(_extract_ts) <= actual_arrival
46+
-- filter out the times we ask for predictions after bus has arrived
47+
),
48+
49+
minute_bins AS (
50+
SELECT
51+
base64_url,
52+
service_date,
53+
trip_id,
54+
stop_id,
55+
stop_sequence,
56+
extract_hour,
57+
extract_minute,
58+
59+
-- wobble metric: https://github.com/cal-itp/data-analyses/blob/main/rt_predictions/03_prediction_inconsistency.ipynb
60+
MAX(arrival_time) - MIN(arrival_time) AS prediction_spread_seconds,
61+
62+
-- prediction accuracy metric: https://github.com/cal-itp/data-analyses/blob/main/rt_predictions/04_reliable_prediction_accuracy.ipynb
63+
AVG(prediction_seconds_difference) AS prediction_error,
64+
AVG(minutes_until_arrival) AS minutes_until_arrival,
65+
66+
-- stop time update completeness metric: https://github.com/cal-itp/data-analyses/blob/main/rt_predictions/01_update_completeness.ipynb
67+
COUNT(*) AS n_predictions_minute,
68+
69+
FROM prediction_difference
70+
-- filter out predictions more than 30 minutes before bus arrives at stop
71+
WHERE ABS(minutes_until_arrival) <= 30
72+
GROUP BY base64_url, service_date, trip_id, stop_id, stop_sequence, extract_hour, extract_minute
73+
),
74+
75+
derive_metrics AS (
76+
SELECT
77+
base64_url,
78+
service_date,
79+
trip_id,
80+
stop_id,
81+
stop_sequence,
82+
83+
-- 04_reliable_prediction_accuracy.ipynb
84+
prediction_error,
85+
minutes_until_arrival,
86+
CASE
87+
WHEN (prediction_error >= -60 * LN(minutes_until_arrival +1.3)
88+
AND prediction_error <= 60* LN(minutes_until_arrival +1.5)) THEN 1
89+
ELSE 0
90+
END AS is_accurate,
91+
92+
-- 01_update_completeness.ipynb
93+
-- double check this, it's supposed to be fresh update, using header/vehicle_timestamp
94+
n_predictions_minute,
95+
CASE
96+
WHEN n_predictions_minute >= 2 THEN 1
97+
ELSE 0
98+
END AS is_complete,
99+
100+
-- 03_prediction_inconsistency.ipynb.ipynb
101+
-- wobble: expected change means the prediction shortens with each passing minute?
102+
-- can this be just the prediction spread, in minutes, averaged over all the minutes?
103+
prediction_spread_seconds / 60 AS prediction_spread_minutes,
104+
FROM minute_bins
105+
),
106+
107+
stop_time_metrics AS (
108+
-- TODO: can this table be combined with other CTEs?
109+
SELECT
110+
base64_url,
111+
service_date,
112+
trip_id,
113+
stop_id,
114+
stop_sequence,
115+
116+
-- 04_reliable_prediction_accuracy
117+
AVG(prediction_error) AS avg_prediction_error_sec,
118+
SUM(is_accurate) AS n_accurate_minutes,
119+
120+
-- 01_update_completeness.ipynb
121+
SUM(is_complete) AS n_complete_minutes,
122+
COUNT(*) AS n_minute_bins,
123+
124+
-- 03_prediction_inconsistency.ipynb
125+
SUM(prediction_spread_minutes) / COUNT(*) AS avg_prediction_spread, -- wobble
126+
127+
-- other derived metrics from this prediction window of 30 minutes prior
128+
SUM(n_predictions_minute) AS n_predictions,
129+
130+
FROM derive_metrics
131+
GROUP BY base64_url, service_date, trip_id, stop_id, stop_sequence
132+
),
133+
134+
fct_stop_time_metrics AS (
135+
SELECT
136+
stop_time_metrics.*,
137+
fct_tu_summaries.trip_instance_key,
138+
fct_tu_summaries.schedule_base64_url
139+
FROM stop_time_metrics
140+
LEFT JOIN fct_tu_summaries -- inner join has left us with zero rows before, is this because of incremental settings?
141+
ON stop_time_metrics.service_date = fct_tu_summaries.service_date
142+
AND stop_time_metrics.base64_url = fct_tu_summaries.base64_url
143+
AND stop_time_metrics.trip_id = fct_tu_summaries.trip_id
144+
)
145+
146+
SELECT * FROM fct_stop_time_metrics
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{{
2+
config(
3+
materialized='incremental',
4+
partition_by={
5+
'field': 'service_date',
6+
'data_type': 'date',
7+
'granularity': 'day'
8+
}, cluster_by=['service_date', 'base64_url']
9+
)
10+
}}
11+
12+
WITH fct_stop_time_updates_filtered AS (
13+
SELECT *
14+
FROM {{ ref('fct_stop_time_updates') }}
15+
-- add extra date boundaries to grab relevant service_dates
16+
WHERE dt >= '2025-06-21' AND dt <= '2025-06-29'
17+
)
18+
19+
SELECT * FROM fct_stop_time_updates_filtered
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
{{
2+
config(
3+
materialized='incremental',
4+
incremental_strategy='insert_overwrite',
5+
partition_by={
6+
'field': 'service_date',
7+
'data_type': 'date',
8+
'granularity': 'day'
9+
}, cluster_by=['service_date', 'base64_url']
10+
)
11+
}}
12+
13+
14+
WITH fct_stop_time_updates AS (
15+
SELECT
16+
base64_url,
17+
service_date,
18+
trip_id,
19+
trip_start_date,
20+
trip_start_time,
21+
stop_id,
22+
stop_sequence,
23+
_extract_ts, -- this is UTC
24+
--trip_update_timestamp,
25+
--header_timestamp,
26+
arrival_time,
27+
departure_time,
28+
FROM {{ ref('fct_stop_time_updates_week') }}
29+
WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28'
30+
),
31+
32+
fct_stop_arrivals AS (
33+
SELECT DISTINCT
34+
base64_url,
35+
service_date,
36+
trip_id,
37+
stop_id,
38+
stop_sequence,
39+
actual_arrival_pacific,
40+
actual_arrival,
41+
42+
FROM {{ ref('fct_stop_time_arrivals_week') }}
43+
WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28'
44+
),
45+
46+
stop_times_with_arrivals AS (
47+
SELECT
48+
tu.base64_url,
49+
tu.service_date,
50+
tu.trip_id,
51+
tu.trip_start_date,
52+
tu.trip_start_time,
53+
tu.stop_id,
54+
tu.stop_sequence,
55+
tu._extract_ts,
56+
EXTRACT(HOUR FROM tu._extract_ts) AS extract_hour,
57+
EXTRACT(MINUTE FROM tu._extract_ts) AS extract_minute,
58+
DATETIME(TIMESTAMP_SECONDS(tu.arrival_time)) AS arrival_time, -- turn posix time into UTC
59+
DATETIME(TIMESTAMP_SECONDS(tu.departure_time)) AS departure_time, -- turn posix time into UTC
60+
61+
arrivals.actual_arrival_pacific,
62+
arrivals.actual_arrival,
63+
64+
FROM fct_stop_time_updates as tu
65+
INNER JOIN fct_stop_arrivals as arrivals
66+
USING (base64_url, service_date, trip_id, stop_id, stop_sequence)
67+
-- removed the trip_start_date/time from this and it merged better?
68+
-- with trip_start_date/time, somehow the merge dropped all the rows (incremental tables loaded locally?)
69+
)
70+
71+
SELECT * FROM stop_times_with_arrivals

0 commit comments

Comments
 (0)