Skip to content
This repository was archived by the owner on Sep 29, 2025. It is now read-only.

Commit 7453833

Browse files
committed
refactor(pems_data): more generic services subpackage
the concept of a Service here is for the "what" of accessing specific data for business logic refactor the StationsBucket to a StationsService StationsService needs an IDataSource to read from (the "how")
1 parent d562b01 commit 7453833

File tree

5 files changed

+82
-76
lines changed

5 files changed

+82
-76
lines changed

pems_data/src/pems_data/services/__init__.py

Whitespace-only changes.

pems_data/src/pems_data/stations.py renamed to pems_data/src/pems_data/services/stations.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import pandas as pd
22

3-
from pems_data.s3 import S3Bucket
3+
from pems_data.sources.base import IDataSource
44

55

6-
class StationsBucket(S3Bucket):
7-
"""Station-specific bucket data."""
6+
class StationsService:
7+
"""Manages fetching of station-related data."""
88

9-
imputation_detector_agg_5min = "imputation/detector_imputed_agg_five_minutes"
10-
metadata_file = "geo/current_stations.parquet"
9+
_imputation_detector_agg_5min = "imputation/detector_imputed_agg_five_minutes"
10+
_metadata_file = "geo/current_stations.parquet"
11+
12+
def __init__(self, data_source: IDataSource):
13+
self.data_source = data_source
1114

1215
def get_district_metadata(self, district_number: str) -> pd.DataFrame:
1316
"""Loads metadata for all stations in the selected District from S3."""
@@ -30,7 +33,7 @@ def get_district_metadata(self, district_number: str) -> pd.DataFrame:
3033
]
3134
filters = [("DISTRICT", "=", district_number)]
3235

33-
return self.read_parquet(self.metadata_file, columns=columns, filters=filters)
36+
return self.data_source.read(self._metadata_file, columns=columns, filters=filters)
3437

3538
def get_imputed_agg_5min(self, station_id: str) -> pd.DataFrame:
3639
"""Loads imputed aggregate 5 minute data for a specific station."""
@@ -45,4 +48,4 @@ def get_imputed_agg_5min(self, station_id: str) -> pd.DataFrame:
4548
]
4649
filters = [("STATION_ID", "=", station_id)]
4750

48-
return self.read_parquet(self.imputation_detector_agg_5min, columns=columns, filters=filters)
51+
return self.data_source.read(self._imputation_detector_agg_5min, columns=columns, filters=filters)

tests/pytest/pems_data/services/__init__.py

Whitespace-only changes.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import pandas as pd
2+
import pytest
3+
4+
from pems_data.sources.base import IDataSource
5+
from pems_data.services.stations import StationsService
6+
7+
8+
class TestStationsService:
9+
@pytest.fixture
10+
def df(self):
11+
return pd.DataFrame({"STATION_ID": [1]})
12+
13+
@pytest.fixture
14+
def data_source(self, df, mocker):
15+
mock = mocker.Mock(spec=IDataSource)
16+
mock.read.return_value = df
17+
return mock
18+
19+
@pytest.fixture
20+
def service(self, data_source):
21+
return StationsService(data_source)
22+
23+
def test_imputation_detector_agg_5min(self):
24+
assert StationsService._imputation_detector_agg_5min == "imputation/detector_imputed_agg_five_minutes"
25+
26+
def test_metadata_file(self):
27+
assert StationsService._metadata_file == "geo/current_stations.parquet"
28+
29+
def test_get_district_metadata(self, service: StationsService, data_source: IDataSource, df):
30+
district_number = "7"
31+
result = service.get_district_metadata(district_number)
32+
33+
data_source.read.assert_called_once_with(
34+
StationsService._metadata_file,
35+
columns=[
36+
"STATION_ID",
37+
"NAME",
38+
"PHYSICAL_LANES",
39+
"STATE_POSTMILE",
40+
"ABSOLUTE_POSTMILE",
41+
"LATITUDE",
42+
"LONGITUDE",
43+
"LENGTH",
44+
"STATION_TYPE",
45+
"DISTRICT",
46+
"FREEWAY",
47+
"DIRECTION",
48+
"COUNTY_NAME",
49+
"CITY_NAME",
50+
],
51+
filters=[("DISTRICT", "=", district_number)],
52+
)
53+
pd.testing.assert_frame_equal(result, df)
54+
55+
def test_get_imputed_agg_5min(self, service: StationsService, data_source: IDataSource, df):
56+
station_id = "123"
57+
58+
result = service.get_imputed_agg_5min(station_id)
59+
60+
data_source.read.assert_called_once_with(
61+
StationsService._imputation_detector_agg_5min,
62+
columns=[
63+
"STATION_ID",
64+
"LANE",
65+
"SAMPLE_TIMESTAMP",
66+
"VOLUME_SUM",
67+
"SPEED_FIVE_MINS",
68+
"OCCUPANCY_AVG",
69+
],
70+
filters=[("STATION_ID", "=", station_id)],
71+
)
72+
pd.testing.assert_frame_equal(result, df)

tests/pytest/pems_data/test_stations.py

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)