Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 7 additions & 69 deletions dojo/tools/wazuh/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
import json

from dojo.models import Endpoint, Finding
from dojo.tools.wazuh.v4_7 import WazuhV4_7
from dojo.tools.wazuh.v4_8 import WazuhV4_8


class WazuhParser:
Expand All @@ -22,74 +22,12 @@ def get_description_for_scan_types(self, scan_type):

def get_findings(self, file, test):
data = json.load(file)

if not data:
return []

# Detect duplications
dupes = {}

# Loop through each element in the list
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

if agent_name:
dupe_key = title + cve + agent_name + package_name + package_version
else:
dupe_key = title + cve + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if id:
find.unsaved_vulnerability_ids = cve

dupes[dupe_key] = find

return list(dupes.values())
if data.get("data"):
return WazuhV4_7().parse_findings(test, data)
if data.get("hits"):
return WazuhV4_8().parse_findings(test, data)
return []
70 changes: 70 additions & 0 deletions dojo/tools/wazuh/v4_7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import hashlib

from dojo.models import Endpoint, Finding


class WazuhV4_7:
def parse_findings(self, test, data):
dupes = {}
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

if agent_name:
dupe_key = title + cve + agent_name + package_name + package_version
else:
dupe_key = title + cve + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if id:
find.unsaved_vulnerability_ids = cve

dupes[dupe_key] = find
return list(dupes.values())
53 changes: 53 additions & 0 deletions dojo/tools/wazuh/v4_8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import hashlib

from dojo.models import Finding


class WazuhV4_8:
def parse_findings(self, test, data):
dupes = {}
vulnerabilities = data.get("hits", {}).get("hits", [])
for item_source in vulnerabilities:
item = item_source.get("_source")
vuln = item.get("vulnerability")
cve = vuln.get("id")
description = vuln.get("description")
description += "\nAgent id:" + item.get("agent").get("id")
description += "\nAgent name:" + item.get("agent").get("name")
severity = vuln.get("severity")
cvssv3_score = vuln.get("score").get("base")
publish_date = vuln.get("published_at").split("T")[0]
agent_id = item.get("agent").get("id")
detection_time = vuln.get("detected_at").split("T")[0]

references = vuln.get("reference")

title = (
cve + " affects (version: " + item.get("package").get("version") + ")"
)

dupe_key = title + agent_id + description
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=item.get("package").get("name"),
component_version=item.get("package").get("version"),
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)
find.unsaved_vulnerability_ids = cve
dupes[dupe_key] = find
return list(dupes.values())
Loading