|
| 1 | +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. |
| 2 | +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. |
| 3 | + |
| 4 | +"""This module exists to verify whether a claimed repo links back to the artifact.""" |
| 5 | +import logging |
| 6 | +import os |
| 7 | +import re |
| 8 | +from collections import deque |
| 9 | +from dataclasses import dataclass |
| 10 | +from enum import Enum |
| 11 | +from pathlib import Path |
| 12 | +from urllib.parse import urlparse |
| 13 | + |
| 14 | +from macaron.parsers.pomparser import parse_pom_string |
| 15 | +from macaron.slsa_analyzer.build_tool import NPM, BaseBuildTool, Docker, Go, Gradle, Maven, Pip, Poetry, Yarn |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | + |
| 20 | +class RepositoryVerificationStatus(str, Enum): |
| 21 | + """A class to store the status of the repo verification.""" |
| 22 | + |
| 23 | + # We found evidence to prove that the repository can be linked back to the publisher of the artifact. |
| 24 | + PASSED = "passed" |
| 25 | + |
| 26 | + # We found evidence showing that the repository is not the publisher of the artifact. |
| 27 | + FAILED = "failed" |
| 28 | + |
| 29 | + # We could not find any evidence to prove or disprove that the repository can be linked back to the artifact. |
| 30 | + UNKNOWN = "unknown" |
| 31 | + |
| 32 | + |
| 33 | +@dataclass(frozen=True) |
| 34 | +class RepositoryVerificationResult: |
| 35 | + """A class to store the information about repository verification.""" |
| 36 | + |
| 37 | + status: RepositoryVerificationStatus |
| 38 | + reason: str |
| 39 | + build_tool: BaseBuildTool |
| 40 | + |
| 41 | + |
| 42 | +class RepoVerifier: |
| 43 | + """A class to verify a claimed repo.""" |
| 44 | + |
| 45 | + _known_maven_namespaces = { |
| 46 | + "github", |
| 47 | + "gitlab", |
| 48 | + "bitbucket", |
| 49 | + "gitee", |
| 50 | + } |
| 51 | + |
| 52 | + def __init__( |
| 53 | + self, |
| 54 | + namespace: str, |
| 55 | + name: str, |
| 56 | + version: str, |
| 57 | + claimed_repo_url: str, |
| 58 | + claimed_repo_fs: str, |
| 59 | + build_tool: BaseBuildTool, |
| 60 | + ): |
| 61 | + self.namespace = namespace |
| 62 | + self.name = name |
| 63 | + self.version = version |
| 64 | + self.claimed_repo_url = claimed_repo_url |
| 65 | + self.claimed_repo_fs = claimed_repo_fs |
| 66 | + self.build_tool = build_tool |
| 67 | + |
| 68 | + def verify_claimed_repo(self) -> RepositoryVerificationResult: |
| 69 | + """Verify whether the claimed repository links back to the artifact.""" |
| 70 | + default_res = RepositoryVerificationResult( |
| 71 | + status=RepositoryVerificationStatus.UNKNOWN, reason="unsupported_type", build_tool=self.build_tool |
| 72 | + ) |
| 73 | + |
| 74 | + match self.build_tool: |
| 75 | + case Maven(): |
| 76 | + git_ns_ver = self._verify_maven_git_ns() |
| 77 | + if git_ns_ver.status == RepositoryVerificationStatus.PASSED: |
| 78 | + return git_ns_ver |
| 79 | + return self._verify_maven() |
| 80 | + case Gradle(): |
| 81 | + git_ns_ver = self._verify_maven_git_ns() |
| 82 | + if git_ns_ver.status == RepositoryVerificationStatus.PASSED: |
| 83 | + return git_ns_ver |
| 84 | + return self._verify_gradle() |
| 85 | + # TODO: add verifiers for other types |
| 86 | + case Poetry(): |
| 87 | + return default_res |
| 88 | + case Pip(): |
| 89 | + return default_res |
| 90 | + case Docker(): |
| 91 | + return default_res |
| 92 | + case NPM(): |
| 93 | + return default_res |
| 94 | + case Yarn(): |
| 95 | + return default_res |
| 96 | + case Go(): |
| 97 | + return default_res |
| 98 | + case _: |
| 99 | + raise NotImplementedError(f"Unsupported build tool: {self.build_tool}") |
| 100 | + |
| 101 | + def _same_group(self, g1: str, g2: str) -> bool: |
| 102 | + if g1 == g2: |
| 103 | + return True |
| 104 | + |
| 105 | + g1_parts = g1.split(".") |
| 106 | + g2_parts = g2.split(".") |
| 107 | + if min(len(g1_parts), len(g2_parts)) < 2: |
| 108 | + return False |
| 109 | + |
| 110 | + if (g1_parts[0] in {"io", "com"} and g1_parts[1] in self._known_maven_namespaces) or ( |
| 111 | + g2_parts[0] in {"io", "com"} and g2_parts[1] in self._known_maven_namespaces |
| 112 | + ): |
| 113 | + if len(g1_parts) >= 3 and len(g2_parts) >= 3: |
| 114 | + return all(g1_parts[i] == g2_parts[i] for i in range(3)) |
| 115 | + return False |
| 116 | + |
| 117 | + for i in range(2): |
| 118 | + if g1_parts[i] != g2_parts[i]: |
| 119 | + return False |
| 120 | + |
| 121 | + return True |
| 122 | + |
| 123 | + @staticmethod |
| 124 | + def _bfs_walk(root_dir: Path, filename: str) -> Path | None: |
| 125 | + if not os.path.exists(root_dir) or not os.path.isdir(root_dir): |
| 126 | + return None |
| 127 | + |
| 128 | + queue: deque[Path] = deque() |
| 129 | + queue.append(Path(root_dir)) |
| 130 | + while queue: |
| 131 | + current_dir = queue.popleft() |
| 132 | + |
| 133 | + # don't look through non-main directories |
| 134 | + if any( |
| 135 | + keyword in current_dir.name.lower() |
| 136 | + for keyword in ["test", "example", "sample", "doc", "demo", "spec", "mock"] |
| 137 | + ): |
| 138 | + continue |
| 139 | + |
| 140 | + if (current_dir / filename).exists(): |
| 141 | + return current_dir / filename |
| 142 | + |
| 143 | + # ignore symlinks to prevent potential infinite loop |
| 144 | + sub_dirs = [Path(it) for it in current_dir.iterdir() if it.is_dir() and not it.is_symlink()] |
| 145 | + queue.extend(sub_dirs) |
| 146 | + |
| 147 | + return None |
| 148 | + |
| 149 | + def _verify_maven_git_ns(self) -> RepositoryVerificationResult: |
| 150 | + parsed_url = urlparse(self.claimed_repo_url) |
| 151 | + if parsed_url is None or parsed_url.hostname is None: |
| 152 | + logger.debug("Could not parse the claimed repository URL: %s", self.claimed_repo_url) |
| 153 | + return RepositoryVerificationResult( |
| 154 | + status=RepositoryVerificationStatus.UNKNOWN, reason="url_parse_error", build_tool=self.build_tool |
| 155 | + ) |
| 156 | + |
| 157 | + claimed_hostname = parsed_url.hostname.split(".")[0] |
| 158 | + claimed_account = parsed_url.path.strip("/").split("/")[0] |
| 159 | + |
| 160 | + group_parts = self.namespace.split(".") |
| 161 | + for platform in self._known_maven_namespaces: |
| 162 | + if ( |
| 163 | + group_parts[0].lower() in {"io", "com"} |
| 164 | + and group_parts[1].lower() == platform.lower() |
| 165 | + and group_parts[1].lower() == claimed_hostname.lower() |
| 166 | + and group_parts[2].lower() == claimed_account.lower() |
| 167 | + ): |
| 168 | + return RepositoryVerificationResult( |
| 169 | + status=RepositoryVerificationStatus.PASSED, reason="git_ns", build_tool=self.build_tool |
| 170 | + ) |
| 171 | + |
| 172 | + return RepositoryVerificationResult( |
| 173 | + # not necessarily a fail, because many projects use maven group ids other than their repo domain. |
| 174 | + status=RepositoryVerificationStatus.UNKNOWN, |
| 175 | + reason="git_ns_mismatch", |
| 176 | + build_tool=self.build_tool, |
| 177 | + ) |
| 178 | + |
| 179 | + def _verify_maven(self) -> RepositoryVerificationResult: |
| 180 | + # TODO: check other pom files. think about how to decide in case of contradicting evidence |
| 181 | + # check if repo contains pom.xml |
| 182 | + pom_file = self._bfs_walk(Path(self.claimed_repo_fs), "pom.xml") |
| 183 | + if pom_file is None: |
| 184 | + logger.debug("Could not find any pom.xml in the repository: %s", self.claimed_repo_url) |
| 185 | + return RepositoryVerificationResult( |
| 186 | + status=RepositoryVerificationStatus.UNKNOWN, reason="no_pom", build_tool=self.build_tool |
| 187 | + ) |
| 188 | + |
| 189 | + pom_content = pom_file.read_text() |
| 190 | + pom_root = parse_pom_string(pom_content) |
| 191 | + |
| 192 | + if not pom_root: |
| 193 | + logger.debug("Could not parse pom.xml: %s", pom_file.as_posix()) |
| 194 | + return RepositoryVerificationResult( |
| 195 | + status=RepositoryVerificationStatus.UNKNOWN, reason="not_parsed_pom", build_tool=self.build_tool |
| 196 | + ) |
| 197 | + |
| 198 | + # find the group id in the pom (project/groupId) |
| 199 | + pom_group_id_elem = next((ch for ch in pom_root if ch.tag.endswith("}groupId")), None) |
| 200 | + if pom_group_id_elem is None or pom_group_id_elem.text is None: |
| 201 | + logger.debug("Could not find groupId in pom.xml: %s", pom_file) |
| 202 | + return RepositoryVerificationResult( |
| 203 | + status=RepositoryVerificationStatus.UNKNOWN, reason="no_group_id_in_pom", build_tool=self.build_tool |
| 204 | + ) |
| 205 | + |
| 206 | + pom_group_id = pom_group_id_elem.text.strip() |
| 207 | + if not self._same_group(pom_group_id, self.namespace): |
| 208 | + logger.debug("Group id in pom.xml does not match the provided group id: %s", pom_file) |
| 209 | + return RepositoryVerificationResult( |
| 210 | + status=RepositoryVerificationStatus.FAILED, reason="group_id_mismatch", build_tool=self.build_tool |
| 211 | + ) |
| 212 | + |
| 213 | + return RepositoryVerificationResult( |
| 214 | + status=RepositoryVerificationStatus.PASSED, reason="group_id_match", build_tool=self.build_tool |
| 215 | + ) |
| 216 | + |
| 217 | + @staticmethod |
| 218 | + def _is_valid_maven_group_id(group_id: str) -> bool: |
| 219 | + pattern = r"^[a-zA-Z][a-zA-Z0-9-]*\.([a-zA-Z][a-zA-Z0-9-]*\.)*[a-zA-Z][a-zA-Z0-9-]*[a-zA-Z0-9]$" |
| 220 | + return re.match(pattern, group_id) is not None |
| 221 | + |
| 222 | + def _verify_gradle(self) -> RepositoryVerificationResult: |
| 223 | + # check if repo contains gradle.properties |
| 224 | + def _extract_group_id_from_properties() -> str | None: |
| 225 | + gradle_properties = self._bfs_walk(Path(self.claimed_repo_fs), "gradle.properties") |
| 226 | + |
| 227 | + if gradle_properties is None: |
| 228 | + logger.debug("Could not find gradle.properties in the repository: %s", self.claimed_repo_url) |
| 229 | + return None |
| 230 | + |
| 231 | + properties_lines = gradle_properties.read_text().splitlines() |
| 232 | + for line in properties_lines: |
| 233 | + line_parts = list(filter(None, map(str.strip, line.strip().lower().split("=")))) |
| 234 | + if len(line_parts) != 2: |
| 235 | + continue |
| 236 | + if line_parts[0] == "group": |
| 237 | + return line_parts[1] |
| 238 | + |
| 239 | + return None |
| 240 | + |
| 241 | + def _extract_group_id_from_build_groovy() -> str | None: |
| 242 | + build_gradle = self._bfs_walk(Path(self.claimed_repo_fs), "build.gradle") |
| 243 | + |
| 244 | + if build_gradle is None: |
| 245 | + logger.debug("Could not find build.gradle in the repository: %s", self.claimed_repo_url) |
| 246 | + return None |
| 247 | + |
| 248 | + build_gradle_content = build_gradle.read_text() |
| 249 | + for line in build_gradle_content.splitlines(): |
| 250 | + line_parts = list(filter(None, map(str.strip, line.strip().lower().split()))) |
| 251 | + if len(line_parts) != 2: |
| 252 | + continue |
| 253 | + if line_parts[0] == "group": |
| 254 | + group_id = line_parts[1].strip('"').strip("'") |
| 255 | + if self._is_valid_maven_group_id(group_id): |
| 256 | + return group_id |
| 257 | + |
| 258 | + return None |
| 259 | + |
| 260 | + def _extract_group_id_from_build_kotlin() -> str | None: |
| 261 | + build_gradle = self._bfs_walk(Path(self.claimed_repo_fs), "build.gradle.kts") |
| 262 | + |
| 263 | + if build_gradle is None: |
| 264 | + logger.debug("Could not find build.gradle.kts in the repository: %s", self.claimed_repo_url) |
| 265 | + return None |
| 266 | + |
| 267 | + build_gradle_content = build_gradle.read_text() |
| 268 | + for line in build_gradle_content.splitlines(): |
| 269 | + line_parts = list(filter(None, map(str.strip, line.strip().lower().split("=")))) |
| 270 | + if len(line_parts) != 2: |
| 271 | + continue |
| 272 | + if line_parts[0] == "group": |
| 273 | + group_id = line_parts[1].strip('"').strip("'") |
| 274 | + if self._is_valid_maven_group_id(group_id): |
| 275 | + return group_id |
| 276 | + |
| 277 | + return None |
| 278 | + |
| 279 | + gradle_group_id = _extract_group_id_from_properties() |
| 280 | + if gradle_group_id is None: |
| 281 | + gradle_group_id = _extract_group_id_from_build_groovy() |
| 282 | + if gradle_group_id is None: |
| 283 | + gradle_group_id = _extract_group_id_from_build_kotlin() |
| 284 | + if gradle_group_id is None: |
| 285 | + logger.debug("Could not find group from gradle manifests for %s", self.claimed_repo_url) |
| 286 | + return RepositoryVerificationResult( |
| 287 | + status=RepositoryVerificationStatus.UNKNOWN, |
| 288 | + reason="no_group_in_gradle_manifest", |
| 289 | + build_tool=self.build_tool, |
| 290 | + ) |
| 291 | + |
| 292 | + if not self._same_group(gradle_group_id, self.namespace): |
| 293 | + logger.debug("Group in gradle manifest does not match the provided group id: %s", self.claimed_repo_url) |
| 294 | + return RepositoryVerificationResult( |
| 295 | + status=RepositoryVerificationStatus.FAILED, reason="group_id_mismatch", build_tool=self.build_tool |
| 296 | + ) |
| 297 | + |
| 298 | + return RepositoryVerificationResult( |
| 299 | + status=RepositoryVerificationStatus.PASSED, reason="group_id_match", build_tool=self.build_tool |
| 300 | + ) |
0 commit comments