Skip to content
This repository was archived by the owner on Sep 29, 2025. It is now read-only.

Commit ca73041

Browse files
committed
docs(pems_data): reference for data sources
1 parent d16e55c commit ca73041

File tree

5 files changed

+120
-33
lines changed

5 files changed

+120
-33
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Data sources
2+
3+
The data source components are responsible for the actual reading of data (the "how"). The design uses an abstract interface, `IDataSource`, to define a standard contract for any data source, making it easy to swap and compose implementations.
4+
5+
::: pems_data.sources.IDataSource
6+
7+
::: pems_data.sources.s3.S3DataSource
8+
9+
::: pems_data.sources.cache.CachingDataSource
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
from abc import ABC, abstractmethod
2+
from typing import Any
23

34
import pandas as pd
45

56

67
class IDataSource(ABC):
7-
"""
8-
An abstract interface for a generic data source.
9-
"""
8+
"""An abstract interface for a generic data source."""
109

1110
@abstractmethod
12-
def read(self, identifier: str, **kwargs) -> pd.DataFrame:
11+
def read(self, identifier: str, **kwargs: dict[str, Any]) -> pd.DataFrame:
1312
"""
1413
Reads data identified by a generic identifier from the source.
1514
1615
Args:
17-
identifier (str): The unique identifier for the data, e.g.,
18-
an S3 key, a database table name, etc.
19-
**kwargs: Additional arguments for the underlying read operation,
20-
such as 'columns' or 'filters'.
16+
identifier (str): The unique identifier for the data, e.g., an S3 key, a database table name, etc.
17+
**kwargs (dict[str, Any]): Additional arguments for the underlying read operation, such as 'columns' or 'filters'.
18+
19+
Returns:
20+
value (pandas.DataFrame): A DataFrame of data from the source for the given identifier.
2121
"""
2222
raise NotImplementedError # pragma: no cover
Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,63 @@
1+
from typing import Any
12
import pandas as pd
23

34
from pems_data.cache import Cache
45
from pems_data.sources import IDataSource
56

67

78
class CachingDataSource(IDataSource):
8-
"""
9-
A DataSource decorator that adds a caching layer to another data source.
10-
"""
9+
"""A data source decorator that adds a caching layer to another data source."""
10+
11+
@property
12+
def cache(self) -> Cache:
13+
"""
14+
Returns:
15+
value (pems_data.cache.Cache): This data source's underlying Cache instance.
16+
"""
17+
return self._cache
18+
19+
@property
20+
def data_source(self) -> IDataSource:
21+
"""
22+
Returns:
23+
value (pems_data.sources.IDataSource): This data source's underlying data source instance.
24+
"""
25+
return self._data_source
1126

1227
def __init__(self, data_source: IDataSource, cache: Cache):
13-
self.cache = cache
14-
self.data_source = data_source
15-
16-
def read(self, identifier: str, **kwargs) -> pd.DataFrame:
17-
# get cache options from kwargs
18-
cache_opts = kwargs.pop("cache_opts", {})
28+
"""Initialize a new CachingDataSource.
29+
30+
Args:
31+
data_source (pems_data.sources.IDataSource): The underlying data source to use for cache misses
32+
cache (pems_data.cache.Cache): The underlying cache to use for get/set operations
33+
"""
34+
self._cache = cache
35+
self._data_source = data_source
36+
37+
def read(self, identifier: str, cache_opts: dict[str, Any] = {}, **kwargs: dict[str, Any]) -> pd.DataFrame:
38+
"""
39+
Reads data identified by a generic identifier from the source. Tries the cache first, setting on a miss.
40+
41+
Args:
42+
identifier (str): The unique identifier for the data, e.g., an S3 key, a database table name, etc.
43+
cache_opts (dict[str, Any]): A dictionary of options for configuring caching of the data
44+
**kwargs (dict[str, Any]): Additional arguments for the underlying read operation, such as 'columns' or 'filters'
45+
46+
Returns:
47+
value (pandas.DataFrame): A DataFrame of data read from the cache (or the source), for the given identifier.
48+
"""
1949
# use cache key from options, fallback to identifier
2050
cache_key = cache_opts.get("key", identifier)
2151
ttl = cache_opts.get("ttl")
2252

2353
# try to get df from cache
24-
cached_df = self.cache.get_df(cache_key)
54+
cached_df = self._cache.get_df(cache_key)
2555
if cached_df is not None:
2656
return cached_df
2757

2858
# on miss, call the wrapped source
29-
df = self.data_source.read(identifier, **kwargs)
59+
df = self._data_source.read(identifier, **kwargs)
3060
# store the result in the cache
31-
self.cache.set_df(cache_key, df, ttl=ttl)
61+
self._cache.set_df(cache_key, df, ttl=ttl)
3262

3363
return df
Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import re
3+
from typing import Any, Callable
34

45
import boto3
56
import pandas as pd
@@ -8,17 +9,51 @@
89

910

1011
class S3DataSource(IDataSource):
11-
default_bucket = os.environ.get("S3_BUCKET_NAME", "caltrans-pems-prd-us-west-2-marts")
12+
"""A data source for fetching data from an S3 bucket."""
13+
14+
@property
15+
def default_bucket(self) -> str:
16+
"""
17+
Returns:
18+
value (str): The value from the `S3_BUCKET_NAME` environment variable, or the Caltrans PeMS prod mart bucket name.
19+
"""
20+
return os.environ.get("S3_BUCKET_NAME", "caltrans-pems-prd-us-west-2-marts")
21+
22+
@property
23+
def name(self) -> str:
24+
"""
25+
Returns:
26+
value (str): The name of this bucket instance.
27+
"""
28+
return self._name
1229

1330
def __init__(self, name: str = None):
14-
self.name = name or self.default_bucket
31+
"""Initialize a new S3DataSource.
32+
33+
Args:
34+
name (str): (Optional) The name of the S3 bucket to source from.
35+
"""
1536
self._client = boto3.client("s3")
37+
self._name = name or self.default_bucket
1638

17-
def get_prefixes(self, filter_pattern: re.Pattern = re.compile(".+"), initial_prefix: str = "", match_func=None) -> list:
39+
def get_prefixes(
40+
self,
41+
filter_pattern: re.Pattern = re.compile(".+"),
42+
initial_prefix: str = "",
43+
match_func: Callable[[re.Match], str] = None,
44+
) -> list:
1845
"""
19-
Lists available filter options by inspecting S3 prefixes. Optionally filter by an initial prefix.
46+
Lists available object prefixes, optionally filtered by an initial prefix.
2047
2148
When a match is found, if match_func exists, add its result to the output list. Otherwise add the entire match.
49+
50+
Args:
51+
filter_pattern (re.Pattern): A regular expression used to match object prefixes
52+
initial_prefix (str): The initial prefix to start the search from
53+
match_func (Callable[[re.Match], str]): A callable used to extract data from prefix matches
54+
55+
Returns:
56+
value (list): A sorted list of unique prefixes that matched the pattern.
2257
"""
2358

2459
s3_keys = self._client.list_objects(Bucket=self.name, Prefix=initial_prefix)
@@ -36,20 +71,33 @@ def get_prefixes(self, filter_pattern: re.Pattern = re.compile(".+"), initial_pr
3671

3772
return sorted(result)
3873

39-
def read(self, *args: str, path=None, columns=None, filters=None, **kwargs) -> pd.DataFrame:
40-
"""Reads data from the S3 path into a pandas DataFrame. Extra kwargs are pass along to `pandas.read_parquet()`.
74+
def read(
75+
self, *args: str, path: str = None, columns: list = None, filters: list = None, **kwargs: dict[str, Any]
76+
) -> pd.DataFrame:
77+
"""Reads data from the S3 path into a pandas DataFrame. Extra kwargs are passed along to `pandas.read_parquet()`.
4178
4279
Args:
43-
*args (str): One or more path relative path components for the data file.
44-
path (str): The absolute S3 URL path to a data file. Using `path` overrides any relative path components provided.
45-
columns (list[str]): If not None, only these columns will be read from the file.
46-
filters (list[tuple] | list[list[tuple]]): To filter out data. Filter syntax: `[[(column, op, val), ...],...]`.
80+
*args (tuple[str]): One or more path relative path components for the data file
81+
path (str): The absolute S3 URL path to a data file; using `path` overrides any relative path components provided
82+
columns (list[str]): If not None, only these columns will be read from the file
83+
filters (list[tuple] | list[list[tuple]]): To filter out data. Filter syntax: `[[(column, op, val), ...],...]`
84+
**kwargs (dict[str, Any]): Extra kwargs to pass to `pandas.read_parquet()`
85+
86+
Returns:
87+
value (pandas.DataFrame): A DataFrame of data read from the source path.
4788
"""
4889
path = path or self.url(*args)
4990
return pd.read_parquet(path, columns=columns, filters=filters, **kwargs)
5091

51-
def url(self, *args):
52-
"""Build an absolute S3 URL to this bucket, with optional path segments."""
92+
def url(self, *args: str) -> str:
93+
"""Build an absolute S3 URL to this bucket, with optional path segments.
94+
95+
Args:
96+
*args (tuple[str]): The components of the S3 path.
97+
98+
Returns:
99+
value (str): An absolute `s3://` URL for this bucket and the path.
100+
"""
53101
parts = [f"s3://{self.name}"]
54102
parts.extend(args)
55103
return "/".join(parts)

tests/pytest/pems_data/sources/test_s3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def test_name_custom(self):
1919
assert S3DataSource("name").name == "name"
2020

2121
def test_name_default(self):
22-
assert S3DataSource().name == S3DataSource.default_bucket
22+
assert S3DataSource().name == S3DataSource().default_bucket
2323

2424
def test_get_prefixes__default(self, data_source: S3DataSource, mock_s3):
2525
result = data_source.get_prefixes()

0 commit comments

Comments
 (0)