Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ms_defender: Improve reliability and error handling #11898

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 51 additions & 41 deletions dojo/tools/ms_defender/parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import logging
import zipfile

from dojo.models import Endpoint, Finding

logger = logging.getLogger(__name__)


class MSDefenderParser:

Expand Down Expand Up @@ -53,22 +56,29 @@ def get_findings(self, file, test):
machines[data.get("id")] = data
for vulnerability in vulnerabilities:
try:
self.process_zip(vulnerability, machines[vulnerability["machineId"]])
machine = machines.get(vulnerability["machineId"], None)
if machine is not None:
self.process_zip(vulnerability, machine)
else:
logger.debug("fallback to process without machine: no machine id")
self.process_json(vulnerability)
except (IndexError, KeyError):
logger.exception("fallback to process without machine: exception")
self.process_json(vulnerability)
else:
return []
return self.findings

def process_json(self, vulnerability):
description = ""
description += "cveId: " + str(vulnerability["cveId"]) + "\n"
description += "machineId: " + str(vulnerability["machineId"]) + "\n"
description += "fixingKbId: " + str(vulnerability["fixingKbId"]) + "\n"
description += "productName: " + str(vulnerability["productName"]) + "\n"
description += "productVendor: " + str(vulnerability["productVendor"]) + "\n"
description += "productVersion: " + str(vulnerability["productVersion"]) + "\n"
title = str(vulnerability["cveId"])
description += "cveId: " + str(vulnerability.get("cveId", "")) + "\n"
description += "machineId: " + str(vulnerability.get("machineId", "")) + "\n"
description += "fixingKbId: " + str(vulnerability.get("fixingKbId", "")) + "\n"
description += "productName: " + str(vulnerability.get("productName", "")) + "\n"
description += "productVendor: " + str(vulnerability.get("productVendor", "")) + "\n"
description += "productVersion: " + str(vulnerability.get("productVersion", "")) + "\n"
description += "machine Info: " + "Unable to find or parse machine data, check logs for more information" + "\n"
title = str(vulnerability.get("cveId", ""))
finding = Finding(
title=title + "_" + vulnerability["machineId"],
severity=self.severity_check(vulnerability["severity"]),
Expand All @@ -86,35 +96,35 @@ def process_json(self, vulnerability):

def process_zip(self, vulnerability, machine):
description = ""
description += "cveId: " + str(vulnerability["cveId"]) + "\n"
description += "machineId: " + str(vulnerability["machineId"]) + "\n"
description += "fixingKbId: " + str(vulnerability["fixingKbId"]) + "\n"
description += "productName: " + str(vulnerability["productName"]) + "\n"
description += "productVendor: " + str(vulnerability["productVendor"]) + "\n"
description += "productVersion: " + str(vulnerability["productVersion"]) + "\n"
description += "machine Info: id: " + str(machine["id"]) + "\n"
description += "machine Info: osPlatform: " + str(machine["osPlatform"]) + "\n"
description += "machine Info: osVersion: " + str(machine["osVersion"]) + "\n"
description += "machine Info: osProcessor: " + str(machine["osProcessor"]) + "\n"
description += "machine Info: version: " + str(machine["version"]) + "\n"
description += "machine Info: agentVersion: " + str(machine["agentVersion"]) + "\n"
description += "machine Info: osBuild: " + str(machine["osBuild"]) + "\n"
description += "machine Info: healthStatus: " + str(machine["healthStatus"]) + "\n"
description += "machine Info: deviceValue: " + str(machine["deviceValue"]) + "\n"
description += "machine Info: rbacGroupId: " + str(machine["rbacGroupId"]) + "\n"
description += "machine Info: rbacGroupName: " + str(machine["rbacGroupName"]) + "\n"
description += "machine Info: riskScore: " + str(machine["riskScore"]) + "\n"
description += "machine Info: exposureLevel: " + str(machine["exposureLevel"]) + "\n"
description += "machine Info: isAadJoined: " + str(machine["isAadJoined"]) + "\n"
description += "machine Info: aadDeviceId: " + str(machine["aadDeviceId"]) + "\n"
description += "machine Info: defenderAvStatus: " + str(machine["defenderAvStatus"]) + "\n"
description += "machine Info: onboardingStatus: " + str(machine["onboardingStatus"]) + "\n"
description += "machine Info: osArchitecture: " + str(machine["osArchitecture"]) + "\n"
description += "machine Info: managedBy: " + str(machine["managedBy"]) + "\n"
title = str(vulnerability["cveId"])
if str(machine["computerDnsName"]) != "null":
description += "cveId: " + str(vulnerability.get("cveId", "")) + "\n"
description += "machineId: " + str(vulnerability.get("machineId", "")) + "\n"
description += "fixingKbId: " + str(vulnerability.get("fixingKbId", "")) + "\n"
description += "productName: " + str(vulnerability.get("productName", "")) + "\n"
description += "productVendor: " + str(vulnerability.get("productVendor", "")) + "\n"
description += "productVersion: " + str(vulnerability.get("productVersion", "")) + "\n"
description += "machine Info: id: " + str(machine.get("id", "")) + "\n"
description += "machine Info: osPlatform: " + str(machine.get("osPlatform", "")) + "\n"
description += "machine Info: osVersion: " + str(machine.get("osVersion", "")) + "\n"
description += "machine Info: osProcessor: " + str(machine.get("osProcessor", "")) + "\n"
description += "machine Info: version: " + str(machine.get("version", "")) + "\n"
description += "machine Info: agentVersion: " + str(machine.get("agentVersion", "")) + "\n"
description += "machine Info: osBuild: " + str(machine.get("osBuild", "")) + "\n"
description += "machine Info: healthStatus: " + str(machine.get("healthStatus", "")) + "\n"
description += "machine Info: deviceValue: " + str(machine.get("deviceValue", "")) + "\n"
description += "machine Info: rbacGroupId: " + str(machine.get("rbacGroupId", "")) + "\n"
description += "machine Info: rbacGroupName: " + str(machine.get("rbacGroupName", "")) + "\n"
description += "machine Info: riskScore: " + str(machine.get("riskScore", "")) + "\n"
description += "machine Info: exposureLevel: " + str(machine.get("exposureLevel", "")) + "\n"
description += "machine Info: isAadJoined: " + str(machine.get("isAadJoined", "")) + "\n"
description += "machine Info: aadDeviceId: " + str(machine.get("aadDeviceId", "")) + "\n"
description += "machine Info: defenderAvStatus: " + str(machine.get("defenderAvStatus", "")) + "\n"
description += "machine Info: onboardingStatus: " + str(machine.get("onboardingStatus", "")) + "\n"
description += "machine Info: osArchitecture: " + str(machine.get("osArchitecture", "")) + "\n"
description += "machine Info: managedBy: " + str(machine.get("managedBy", "")) + "\n"
title = str(vulnerability.get("cveId", ""))
if "computerDnsName" in machine and str(machine["computerDnsName"]) != "null":
title = title + "_" + str(machine["computerDnsName"])
if str(machine["osPlatform"]) != "null":
if "osPlatform" in machine and str(machine["osPlatform"]) != "null":
title = title + "_" + str(machine["osPlatform"])
finding = Finding(
title=title + "_" + vulnerability["machineId"],
Expand All @@ -123,18 +133,18 @@ def process_zip(self, vulnerability, machine):
static_finding=False,
dynamic_finding=True,
)
if vulnerability["fixingKbId"] is not None:
if "fixingKbId" in vulnerability and vulnerability["fixingKbId"] is not None:
finding.mitigation = vulnerability["fixingKbId"]
if vulnerability["cveId"] is not None:
if "cveId" in vulnerability:
finding.unsaved_vulnerability_ids = []
finding.unsaved_vulnerability_ids.append(vulnerability["cveId"])
self.findings.append(finding)
finding.unsaved_endpoints = []
if machine["computerDnsName"] is not None:
if "computerDnsName" in machine and machine["computerDnsName"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["computerDnsName"]).replace(" ", "").replace("(", "_").replace(")", "_")))
if machine["lastIpAddress"] is not None:
if "lastIpAddress" in machine and machine["lastIpAddress"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["lastIpAddress"])))
if machine["lastExternalIpAddress"] is not None:
if "lastExternalIpAddress" in machine and machine["lastExternalIpAddress"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["lastExternalIpAddress"])))

def severity_check(self, input):
Expand Down
Binary file not shown.
10 changes: 10 additions & 0 deletions unittests/tools/test_ms_defender_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ def test_parser_defender_issue_11217(self):
endpoint.clean()
self.assertEqual("Max_Mustermann_iPadAir_17zoll__2ndgeneration_", finding.unsaved_endpoints[0].host)

def test_parser_defender_error_handling(self):
"""https://github.com/DefectDojo/django-DefectDojo/issues/11896 handle missing values properly, i.e. defenderAvStatus"""
testfile = open(get_unit_tests_scans_path("ms_defender") / "defender_error_handling.zip", encoding="utf-8")
parser = MSDefenderParser()
findings = parser.get_findings(testfile, Test())
testfile.close()
self.assertEqual(421, len(findings))
finding = findings[0]
self.assertEqual(3, len(finding.unsaved_endpoints))

def test_parser_defender_empty_machines(self):
testfile = open(get_unit_tests_scans_path("ms_defender") / "empty_machines.zip", encoding="utf-8")
parser = MSDefenderParser()
Expand Down