Skip to content

Commit 7d4aa9e

Browse files
authored
Merge pull request #85 from Cyb3rWard0g/master
STIX Utils and Attack Client with Local STIX Data
2 parents a62bacf + 9ecb2fa commit 7d4aa9e

File tree

8 files changed

+700
-42
lines changed

8 files changed

+700
-42
lines changed

attackcti/attack_api.py

Lines changed: 72 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# https://github.com/oasis-open/cti-python-stix2/issues/183
1010
# https://stackoverflow.com/a/4406521
1111

12-
from stix2 import TAXIICollectionSource, Filter, CompositeDataSource, FileSystemSource
12+
from stix2 import TAXIICollectionSource, Filter, CompositeDataSource
1313
from stix2.datastore.filters import apply_common_filters
1414
from stix2.utils import get_type_from_id
1515
from stix2.v20.sdo import (
@@ -25,32 +25,22 @@
2525
import json
2626
import os
2727

28-
from .models import *
29-
from pydantic import TypeAdapter
28+
from pydantic import TypeAdapter, ValidationError
3029
from typing import List, Type, Dict, Any, Union
30+
from attackcti.models import *
31+
from attackcti.utils.storage import STIXStore
3132

3233
# os.environ['http_proxy'] = "http://xxxxxxx"
3334
# os.environ['https_proxy'] = "https://xxxxxxx"
3435

3536
ATTACK_STIX_COLLECTIONS = "https://cti-taxii.mitre.org/stix/collections/"
3637
ENTERPRISE_ATTACK = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
37-
PRE_ATTACK = "062767bd-02d2-4b72-84ba-56caef0f8658"
3838
MOBILE_ATTACK = "2f669986-b40b-4423-b720-4396ca6a462b"
3939
ICS_ATTACK = "02c3ef24-9cd4-48f3-a99f-b74ce24f1d34"
4040

41-
ENTERPRISE_ATTACK_LOCAL_DIR = "enterprise-attack"
42-
PRE_ATTACK_LOCAL_DIR = "pre-attack"
43-
MOBILE_ATTACK_LOCAL_DIR = "mobile-attack"
44-
ICS_ATTACK_LOCAL_DIR = "ics-attack"
45-
46-
class attack_client(object):
47-
"""A Python Module for ATT&CK"""
48-
TC_ENTERPRISE_SOURCE = None
49-
TC_PRE_SOURCE = None
50-
TC_MOBILE_SOURCE = None
51-
TC_ICS_SOURCE = None
52-
COMPOSITE_DS = None
53-
41+
class attack_client:
42+
"""A Python Module for accessing ATT&CK data locally or remotely."""
43+
5444
pydantic_model_mapping = {
5545
"techniques": Technique,
5646
"data-component": DataComponent,
@@ -74,37 +64,79 @@ class attack_client(object):
7464
"x-mitre-data-component": DataComponent
7565
}
7666

77-
def __init__(self, local_path=None, include_pre_attack=False, proxies=None, verify=True):
67+
def __init__(self, local_paths=None, proxies=None, verify=True):
7868
"""
69+
Initializes the ATT&CK client, setting up local or remote data sources.
70+
7971
Args:
80-
proxies - See https://requests.readthedocs.io/en/latest/user/advanced/#proxies
81-
verify - See https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification
72+
local_paths (dict, optional): Dictionary with paths to local directories or JSON files for each domain.
73+
Keys should be 'enterprise', 'mobile', and 'ics'.
74+
proxies (dict, optional): Dictionary mapping protocol or protocol and hostname to the URL of the proxy.
75+
verify (bool, optional): Whether to verify SSL certificates. Defaults to True.
8276
"""
77+
self.COMPOSITE_DS = CompositeDataSource()
8378

84-
if local_path is not None and os.path.isdir(os.path.join(local_path, ENTERPRISE_ATTACK_LOCAL_DIR)) \
85-
and os.path.isdir(os.path.join(local_path, PRE_ATTACK_LOCAL_DIR)) \
86-
and os.path.isdir(os.path.join(local_path, MOBILE_ATTACK_LOCAL_DIR)) \
87-
and os.path.isdir(os.path.join(local_path, ICS_ATTACK_LOCAL_DIR)):
88-
self.TC_ENTERPRISE_SOURCE = FileSystemSource(os.path.join(local_path, ENTERPRISE_ATTACK_LOCAL_DIR))
89-
self.TC_PRE_SOURCE = FileSystemSource(os.path.join(local_path, PRE_ATTACK_LOCAL_DIR))
90-
self.TC_MOBILE_SOURCE = FileSystemSource(os.path.join(local_path, MOBILE_ATTACK_LOCAL_DIR))
91-
self.TC_ICS_SOURCE = FileSystemSource(os.path.join(local_path, ICS_ATTACK_LOCAL_DIR))
92-
else:
93-
ENTERPRISE_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + ENTERPRISE_ATTACK + "/", verify=verify, proxies=proxies)
94-
PRE_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + PRE_ATTACK + "/", verify=verify, proxies=proxies)
95-
MOBILE_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + MOBILE_ATTACK + "/", verify=verify, proxies=proxies)
96-
ICS_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + ICS_ATTACK + "/", verify=verify, proxies=proxies)
79+
# Validate local_paths with Pydantic
80+
if local_paths:
81+
try:
82+
self.local_paths = STIXLocalPaths(**local_paths)
83+
except ValidationError as e:
84+
raise ValueError(f"Invalid local_paths: {e}")
9785

98-
self.TC_ENTERPRISE_SOURCE = TAXIICollectionSource(ENTERPRISE_COLLECTION)
99-
self.TC_PRE_SOURCE = TAXIICollectionSource(PRE_COLLECTION)
100-
self.TC_MOBILE_SOURCE = TAXIICollectionSource(MOBILE_COLLECTION)
101-
self.TC_ICS_SOURCE = TAXIICollectionSource(ICS_COLLECTION)
86+
# Initialize data sources
87+
self.init_data_sources(self.local_paths if local_paths else None, proxies, verify)
88+
89+
def init_data_sources(self, local_paths, proxies, verify):
90+
"""
91+
Initializes data sources, either local or remote.
92+
93+
Args:
94+
local_paths (LocalPathsModel, optional): Validated dictionary with paths to local directories or JSON files for each domain.
95+
proxies (dict, optional): Dictionary mapping protocol or protocol and hostname to the URL of the proxy.
96+
verify (bool, optional): Whether to verify SSL certificates. Defaults to True.
97+
"""
98+
if local_paths:
99+
self.TC_ENTERPRISE_SOURCE = self.load_stix_store(local_paths.enterprise)
100+
self.TC_MOBILE_SOURCE = self.load_stix_store(local_paths.mobile)
101+
self.TC_ICS_SOURCE = self.load_stix_store(local_paths.ics)
102+
103+
if not (self.TC_ENTERPRISE_SOURCE and self.TC_MOBILE_SOURCE and self.TC_ICS_SOURCE):
104+
self.initialize_taxii_sources(proxies, verify)
105+
else:
106+
self.initialize_taxii_sources(proxies, verify)
102107

103-
self.COMPOSITE_DS = CompositeDataSource()
104108
self.COMPOSITE_DS.add_data_sources([self.TC_ENTERPRISE_SOURCE, self.TC_MOBILE_SOURCE, self.TC_ICS_SOURCE])
105109

106-
if include_pre_attack:
107-
self.COMPOSITE_DS.add_data_sources([self.TC_PRE_SOURCE])
110+
def load_stix_store(self, path):
111+
"""
112+
Loads a STIXStore from the given path.
113+
114+
Args:
115+
path (str): Path to the source directory or JSON file.
116+
117+
Returns:
118+
The loaded STIXStore or None if the path is invalid.
119+
"""
120+
if path and os.path.exists(path):
121+
store = STIXStore(path)
122+
return store.get_store()
123+
return None
124+
125+
def initialize_taxii_sources(self, proxies, verify):
126+
"""
127+
Initializes data sources from the ATT&CK TAXII server.
128+
129+
Args:
130+
proxies (dict, optional): Dictionary mapping protocol or protocol and hostname to the URL of the proxy.
131+
verify (bool, optional): Whether to verify SSL certificates. Defaults to True.
132+
"""
133+
ENTERPRISE_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + ENTERPRISE_ATTACK + "/", verify=verify, proxies=proxies)
134+
MOBILE_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + MOBILE_ATTACK + "/", verify=verify, proxies=proxies)
135+
ICS_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + ICS_ATTACK + "/", verify=verify, proxies=proxies)
136+
137+
self.TC_ENTERPRISE_SOURCE = TAXIICollectionSource(ENTERPRISE_COLLECTION)
138+
self.TC_MOBILE_SOURCE = TAXIICollectionSource(MOBILE_COLLECTION)
139+
self.TC_ICS_SOURCE = TAXIICollectionSource(ICS_COLLECTION)
108140

109141
def get_stix_objects(
110142
self,

attackcti/models.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,9 @@ def extract_phase_name(cls, values: Dict[str, Any]):
191191
kill_chain_phases = values['tactic']
192192
phase_names = [phase['phase_name'] for phase in kill_chain_phases if 'phase_name' in phase]
193193
values['tactic'] = phase_names
194-
return values
194+
return values
195+
196+
class STIXLocalPaths(BaseModel):
197+
enterprise: Optional[str] = Field(None, description="Path to the local enterprise-attack directory or JSON file.")
198+
mobile: Optional[str] = Field(None, description="Path to the local mobile-attack directory or JSON file.")
199+
ics: Optional[str] = Field(None, description="Path to the local ics-attack directory or JSON file.")

attackcti/utils/__init__.py

Whitespace-only changes.

attackcti/utils/downloader.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import requests
2+
from pathlib import Path
3+
from typing import Optional, List, Dict
4+
import re
5+
import json
6+
7+
class STIXDownloader:
8+
def __init__(self, download_dir: str, domain: Optional[str] = None, stix_version: Optional[str] = None, use_session: bool = False):
9+
"""
10+
Initializes the STIXDownloader with optional default settings.
11+
12+
Args:
13+
download_dir (str): Directory to download the STIX files to.
14+
domain (Optional[str]): Default ATT&CK domain from the following list ["enterprise", "mobile", "ics"].
15+
stix_version (Optional[str]): Default version of STIX to download. Options are "2.0" or "2.1".
16+
use_session (bool): Whether to use a persistent session for HTTP requests. Defaults to False.
17+
"""
18+
self.download_dir = download_dir
19+
self.domain = domain
20+
self.stix_version = stix_version
21+
self.use_session = use_session
22+
self.cti_base_url = "https://raw.githubusercontent.com/mitre/cti/"
23+
self.stix_data_base_url = "https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/"
24+
self.session = requests.Session() if use_session else None # Use a session if specified
25+
self.downloaded_file_paths: Dict[str, str] = {} # Attribute to store the full paths of the downloaded files
26+
27+
@staticmethod
28+
def fetch_attack_stix2_0_versions() -> List[str]:
29+
"""
30+
Fetches available ATT&CK versions in STIX 2.0 format from the cti GitHub repository.
31+
32+
Returns:
33+
List[str]: A list of available ATT&CK versions in STIX 2.0 format.
34+
"""
35+
ref_to_tag = re.compile(r"ATT&CK-v(.*)")
36+
tags = requests.get("https://api.github.com/repos/mitre/cti/git/refs/tags").json()
37+
versions = [ref_to_tag.search(tag["ref"]).groups()[0] for tag in tags if "ATT&CK-v" in tag["ref"]]
38+
return versions
39+
40+
@staticmethod
41+
def fetch_attack_stix2_1_versions() -> List[str]:
42+
"""
43+
Fetches available ATT&CK versions in STIX 2.1 format from the attack-stix-data repository.
44+
45+
Returns:
46+
List[str]: A list of available ATT&CK versions in STIX 2.1 format.
47+
"""
48+
index_url = "https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/index.json"
49+
index_data = requests.get(index_url).json()
50+
versions = [v["version"] for v in index_data["collections"][0]["versions"]]
51+
return versions
52+
53+
def download_file(self, url: str, dest_path: str) -> None:
54+
"""
55+
Downloads a file from the given URL to the specified destination path.
56+
57+
Args:
58+
url (str): URL of the file to download.
59+
dest_path (str): Destination file path to save the downloaded file.
60+
61+
Raises:
62+
requests.HTTPError: If the download request fails.
63+
"""
64+
if self.session:
65+
response = self.session.get(url, stream=True) # Use session if available
66+
else:
67+
response = requests.get(url, stream=True) # Otherwise, use a regular request
68+
69+
response.raise_for_status()
70+
with open(dest_path, 'wb') as f:
71+
for chunk in response.iter_content(chunk_size=8192):
72+
f.write(chunk)
73+
74+
def is_pretty_printed(self, file_path: str) -> bool:
75+
"""
76+
Checks if the JSON file is already pretty-printed.
77+
78+
Args:
79+
file_path (str): Path to the JSON file to check.
80+
81+
Returns:
82+
bool: True if the file is pretty-printed, False otherwise.
83+
"""
84+
with open(file_path, 'r', encoding='utf-8') as f:
85+
for i, line in enumerate(f):
86+
if i > 10: # Check only the first few lines for efficiency
87+
break
88+
if len(line.strip()) == 0:
89+
continue
90+
if line.strip().startswith('{') or line.strip().startswith('['):
91+
continue
92+
return True
93+
return False
94+
95+
def pretty_print_json(self, file_path: str) -> None:
96+
"""
97+
Converts a compact JSON file to a pretty-printed format.
98+
99+
Args:
100+
file_path (str): Path to the JSON file to be pretty-printed.
101+
"""
102+
with open(file_path, 'r', encoding='utf-8') as f:
103+
data = json.load(f)
104+
105+
with open(file_path, 'w', encoding='utf-8') as f:
106+
json.dump(data, f, indent=4, ensure_ascii=False)
107+
108+
def download_attack_data(self, stix_version: Optional[str] = None, domain: Optional[str] = None, release: Optional[str] = None, pretty_print: Optional[bool] = None):
109+
"""
110+
Downloads the ATT&CK STIX release file. If release is not specified, downloads the latest release.
111+
112+
Args:
113+
stix_version (Optional[str]): Version of STIX to download. Options are "2.0" or "2.1". If not specified, uses the default.
114+
domain (Optional[str]): An ATT&CK domain from the following list ["enterprise", "mobile", "ics"]. If not specified, uses the default.
115+
release (Optional[str]): ATT&CK release to download. If not specified, downloads the latest release.
116+
pretty_print (Optional[bool]): Whether to pretty-print the JSON file after downloading. If None, do not pretty-print.
117+
118+
Raises:
119+
ValueError: If the STIX version is invalid or the release version does not exist.
120+
"""
121+
stix_version = stix_version or self.stix_version
122+
domain = domain or self.domain
123+
124+
if stix_version not in ["2.0", "2.1"]:
125+
raise ValueError("Invalid STIX version. Choose '2.0' or '2.1'.")
126+
127+
if stix_version == "2.0":
128+
versions = self.fetch_attack_stix2_0_versions()
129+
base_url = self.cti_base_url
130+
if release is None:
131+
release_dir = "master"
132+
elif release not in versions:
133+
raise ValueError(f"Release {release} not found in cti repository.")
134+
else:
135+
release_dir = f"ATT%26CK-v{release}"
136+
url_path = f"{release_dir}/{domain}-attack/{domain}-attack.json"
137+
else:
138+
versions = self.fetch_attack_stix2_1_versions()
139+
base_url = self.stix_data_base_url
140+
if release is None:
141+
release_dir = "master"
142+
elif release not in versions:
143+
raise ValueError(f"Release {release} not found in attack-stix-data repository.")
144+
else:
145+
url_path = f"{domain}-attack/{domain}-attack-{release}.json"
146+
147+
download_url = f"{base_url}{url_path}"
148+
149+
release_folder = "latest" if release is None else f"v{release}"
150+
release_download_dir = Path(self.download_dir) / release_folder
151+
release_download_dir.mkdir(parents=True, exist_ok=True)
152+
153+
dest_path = release_download_dir / f"{domain}-attack.json"
154+
self.download_file(download_url, dest_path)
155+
156+
self.downloaded_file_path = str(dest_path) # Store the full path of the downloaded file
157+
self.downloaded_file_paths[domain] = str(dest_path) # Store the path for the specific domain
158+
159+
if pretty_print:
160+
if self.is_pretty_printed(self.downloaded_file_path):
161+
print("Warning: The file appears to be already pretty-printed.")
162+
self.pretty_print_json(self.downloaded_file_path)
163+
164+
print(f"Downloaded {domain}-attack.json to {release_download_dir}")
165+
166+
def download_all_domains(self, stix_version: Optional[str] = None, release: Optional[str] = None, pretty_print: Optional[bool] = None):
167+
"""
168+
Downloads the ATT&CK STIX release files for all domains (enterprise, mobile, ics).
169+
170+
Args:
171+
stix_version (Optional[str]): Version of STIX to download. Options are "2.0" or "2.1". If not specified, uses the default.
172+
release (Optional[str]): ATT&CK release to download. If not specified, downloads the latest release.
173+
pretty_print (Optional[bool]): Whether to pretty-print the JSON file after downloading. If None, do not pretty-print.
174+
"""
175+
domains = ["enterprise", "mobile", "ics"]
176+
for domain in domains:
177+
self.download_attack_data(stix_version=stix_version, domain=domain, release=release, pretty_print=pretty_print)
178+
179+
return self.downloaded_file_paths

attackcti/utils/storage.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from stix2 import FileSystemSource, MemorySource
2+
from pathlib import Path
3+
4+
class STIXStore:
5+
def __init__(self, path: str, auto_load: bool = True):
6+
"""
7+
Initializes the STIXStore.
8+
9+
Args:
10+
path (str): Path to the source directory or JSON file.
11+
auto_load (bool): Flag indicating whether to automatically load data during initialization. Defaults to True.
12+
"""
13+
self.path = Path(path)
14+
self.source = None
15+
16+
if auto_load:
17+
self.load_data()
18+
19+
def load_data(self):
20+
"""
21+
Loads data from the specified path, determining if it's a directory or a file.
22+
23+
Raises:
24+
ValueError: If the path is invalid or not specified correctly.
25+
"""
26+
if self.path.is_dir():
27+
self.source = FileSystemSource(str(self.path))
28+
elif self.path.is_file() and self.path.suffix == '.json':
29+
self.source = MemorySource()
30+
self.source.load_from_file(str(self.path))
31+
else:
32+
raise ValueError(f"The specified path {self.path} is not a valid directory or JSON file.")
33+
34+
def get_store(self):
35+
"""
36+
Returns the loaded data store.
37+
38+
Returns:
39+
The loaded data store (FileSystemSource or MemoryStore).
40+
"""
41+
if self.source is None:
42+
raise ValueError("Data has not been loaded yet. Call load_data() first.")
43+
return self.source

0 commit comments

Comments
 (0)