Skip to content
This repository was archived by the owner on Sep 29, 2025. It is now read-only.

Commit 30bc510

Browse files
committed
feat(pems_data): initial S3 helper class
- get a fully-formed bucket URL - get a list of object prefixes matching a pattern - read a parquet file into a DataFrame
1 parent bd658a2 commit 30bc510

File tree

5 files changed

+127
-0
lines changed

5 files changed

+127
-0
lines changed

pems_data/src/pems_data/__init__.py

Whitespace-only changes.

pems_data/src/pems_data/s3.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import re
2+
3+
import boto3
4+
import pandas as pd
5+
6+
7+
class S3Bucket:
8+
prod_marts = "caltrans-pems-prd-us-west-2-marts"
9+
10+
def __init__(self, name: str = None):
11+
self.name = name or self.prod_marts
12+
13+
def get_prefixes(self, filter_pattern: re.Pattern = re.compile(".+"), initial_prefix: str = "", match_func=None) -> list:
14+
"""
15+
Lists available filter options by inspecting S3 prefixes. Optionally filter by an initial prefix.
16+
17+
When a match is found, if match_func exists, add its result to the output list. Otherwise add the entire match.
18+
"""
19+
20+
s3 = boto3.client("s3")
21+
s3_keys = s3.list_objects(Bucket=self.name, Prefix=initial_prefix)
22+
23+
result = set()
24+
25+
for item in s3_keys["Contents"]:
26+
s3_path = item["Key"]
27+
match = re.search(filter_pattern, s3_path)
28+
if match:
29+
if match_func:
30+
result.add(match_func(match))
31+
else:
32+
result.add(match.group(0))
33+
34+
return sorted(result)
35+
36+
def read_parquet(self, *args, path=None, columns=None, filters=None, **kwargs) -> pd.DataFrame:
37+
"""Reads data from the S3 path into a pandas DataFrame. Extra kwargs are pass along to `pandas.read_parquet()`.
38+
39+
Args:
40+
*args (str): One or more path relative path components for the data file.
41+
path (str): The absolute S3 URL path to a data file. Using `path` overrides any relative path components provided.
42+
columns (list[str]): If not None, only these columns will be read from the file.
43+
filters (list[tuple] | list[list[tuple]]): To filter out data. Filter syntax: `[[(column, op, val), ...],...]`.
44+
"""
45+
path = path or self.url(*args)
46+
return pd.read_parquet(path, columns=columns, filters=filters, **kwargs)
47+
48+
def url(self, *args):
49+
"""Build an absolute S3 URL to this bucket, with optional path segments."""
50+
parts = [f"s3://{self.name}"]
51+
parts.extend(args)
52+
return "/".join(parts)

tests/pytest/pems_data/__init__.py

Whitespace-only changes.

tests/pytest/pems_data/conftest.py

Whitespace-only changes.

tests/pytest/pems_data/test_s3.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import re
2+
3+
import pytest
4+
5+
from pems_data.s3 import S3Bucket
6+
7+
8+
class TestS3Bucket:
9+
10+
@pytest.fixture
11+
def bucket(self) -> S3Bucket:
12+
return S3Bucket()
13+
14+
@pytest.fixture(autouse=True)
15+
def mock_s3(self, mocker):
16+
s3 = mocker.patch("boto3.client").return_value
17+
s3.list_objects.return_value = {
18+
"Contents": [
19+
{"Key": "path1/file2.json"},
20+
{"Key": "path2/file1.json"},
21+
{"Key": "path1/file1.json"},
22+
]
23+
}
24+
return s3
25+
26+
@pytest.fixture(autouse=True)
27+
def mock_read_parquet(self, mocker):
28+
return mocker.patch("pandas.read_parquet")
29+
30+
def test_name_custom(self):
31+
assert S3Bucket("name").name == "name"
32+
33+
def test_name_default(self):
34+
assert S3Bucket().name == S3Bucket.prod_marts
35+
36+
def test_get_prefixes__default(self, bucket: S3Bucket, mock_s3):
37+
result = bucket.get_prefixes()
38+
39+
mock_s3.list_objects.assert_called_once_with(Bucket=bucket.name, Prefix="")
40+
assert result == ["path1/file1.json", "path1/file2.json", "path2/file1.json"]
41+
42+
def test_get_prefixes__filter_pattern(self, bucket: S3Bucket):
43+
result = bucket.get_prefixes(re.compile("path1/.+"))
44+
45+
assert result == ["path1/file1.json", "path1/file2.json"]
46+
47+
def test_get_prefixes__initial_prefix(self, bucket: S3Bucket, mock_s3):
48+
bucket.get_prefixes(initial_prefix="prefix")
49+
50+
mock_s3.list_objects.assert_called_once_with(Bucket=bucket.name, Prefix="prefix")
51+
52+
def test_get_prefixes__match_func(self, bucket: S3Bucket):
53+
result = bucket.get_prefixes(re.compile("path1/(.+)"), match_func=lambda m: m.group(1))
54+
55+
assert result == ["file1.json", "file2.json"]
56+
57+
def test_read_parquet(self, bucket: S3Bucket, mock_read_parquet):
58+
mock_read_parquet.return_value = "data"
59+
expected_path = bucket.url("path")
60+
61+
columns = ["col1", "col2", "col3"]
62+
filters = [("col1", "=", "val1")]
63+
64+
result = bucket.read_parquet("path", columns=columns, filters=filters, extra1="extra1", extra2="extra2")
65+
66+
assert result == "data"
67+
mock_read_parquet.assert_called_once_with(
68+
expected_path, columns=columns, filters=filters, extra1="extra1", extra2="extra2"
69+
)
70+
71+
def test_url__no_path(self, bucket: S3Bucket):
72+
assert bucket.url() == f"s3://{bucket.name}"
73+
74+
def test_url__with_path(self, bucket: S3Bucket):
75+
assert bucket.url("path1", "path2") == f"s3://{bucket.name}/path1/path2"

0 commit comments

Comments
 (0)