Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Importer Close Old Findings: Accommodate different dedupe algorithms #11729

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dojo/importers/base_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ def determine_process_method(
**kwargs,
)

def determine_deduplication_algorithm(self) -> str:
"""
Determines what dedupe algorithm to use for the Test being processed.
:return: A string representing the dedupe algorithm to use.
"""
return self.test.deduplication_algorithm

def update_test_meta(self):
"""
Update the test with some values stored in the kwargs dict. The common
Expand Down
51 changes: 33 additions & 18 deletions dojo/importers/default_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,29 +254,44 @@ def close_old_findings(
# First check if close old findings is desired
if not self.close_old_findings_toggle:
return []
logger.debug("REIMPORT_SCAN: Closing findings no longer present in scan report")
# Close old active findings that are not reported by this scan.
# Refactoring this to only call test.finding_set.values() once.
findings = findings.values()
mitigated_hash_codes = []

logger.debug("IMPORT_SCAN: Closing findings no longer present in scan report")
# Remove all the findings that are coming from the report already mitigated
new_hash_codes = []
for finding in findings:
new_hash_codes.append(finding["hash_code"])
if finding.get("is_mitigated", None):
mitigated_hash_codes.append(finding["hash_code"])
for hash_code in new_hash_codes:
if hash_code == finding["hash_code"]:
new_hash_codes.remove(hash_code)
new_unique_ids_from_tool = []
for finding in findings.values():
# Do not process closed findings in the report
if finding.get("is_mitigated", False):
continue
# Grab the hash code
if (hash_code := finding.get("hash_code")) is not None:
new_hash_codes.append(hash_code)
if (unique_id_from_tool := finding.get("unique_id_from_tool")) is not None:
new_unique_ids_from_tool.append(unique_id_from_tool)
# Get the initial filtered list of old findings to be closed without
# considering the scope of the product or engagement
old_findings = Finding.objects.exclude(
test=self.test,
).exclude(
hash_code__in=new_hash_codes,
).filter(
old_findings = Finding.objects.filter(
test__test_type=self.test.test_type,
active=True,
)
).exclude(test=self.test)
# Filter further based on the deduplication algorithm set on the test
self.deduplication_algorithm = self.determine_deduplication_algorithm()
if self.deduplication_algorithm in ["hash_code", "legacy"]:
old_findings = old_findings.exclude(
hash_code__in=new_hash_codes,
)
if self.deduplication_algorithm == "unique_id_from_tool":
old_findings = old_findings.exclude(
unique_id_from_tool__in=new_unique_ids_from_tool,
)
if self.deduplication_algorithm == "unique_id_from_tool_or_hash_code":
old_findings = old_findings.exclude(
(Q(hash_code__isnull=False) & Q(hash_code__in=new_hash_codes))
| (
Q(unique_id_from_tool__isnull=False)
& Q(unique_id_from_tool__in=new_unique_ids_from_tool)
),
)
# Accommodate for product scope or engagement scope
if self.close_old_findings_product_scope:
old_findings = old_findings.filter(test__engagement__product=self.test.engagement.product)
Expand Down
7 changes: 0 additions & 7 deletions dojo/importers/default_reimporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,6 @@ def process_scan(
test_import_history,
)

def determine_deduplication_algorithm(self) -> str:
"""
Determines what dedupe algorithm to use for the Test being processed.
:return: A string representing the dedupe algorithm to use.
"""
return self.test.deduplication_algorithm

def process_findings(
self,
parsed_findings: list[Finding],
Expand Down
193 changes: 193 additions & 0 deletions unittests/scans/semgrep/close_old_findings_report_line31.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
{
"version": "1.97.0",
"results": [
{
"check_id": "python.lang.security.audit.dangerous-subprocess-use-tainted-env-args.dangerous-subprocess-use-tainted-env-args",
"path": "sample/brute.py",
"start": {
"line": 31,
"col": 29,
"offset": 285
},
"end": {
"line": 31,
"col": 58,
"offset": 314
},
"extra": {
"metavars": {
"$FUNC": {
"start": {
"line": 31,
"col": 25,
"offset": 281
},
"end": {
"line": 31,
"col": 28,
"offset": 284
},
"abstract_content": "run"
},
"$CMD": {
"start": {
"line": 31,
"col": 29,
"offset": 285
},
"end": {
"line": 31,
"col": 58,
"offset": 314
},
"abstract_content": "[program username password]"
}
},
"message": "Detected subprocess function 'run' with user controlled data. A malicious actor could leverage this to perform command injection. You may consider using 'shlex.quote()'.",
"metadata": {
"owasp": [
"A01:2017 - Injection",
"A03:2021 - Injection"
],
"cwe": [
"CWE-78: Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection')"
],
"asvs": {
"control_id": "5.3.8 OS Command Injection",
"control_url": "https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v53-output-encoding-and-injection-prevention-requirements",
"section": "V5: Validation, Sanitization and Encoding Verification Requirements",
"version": "4"
},
"references": [
"https://stackoverflow.com/questions/3172470/actual-meaning-of-shell-true-in-subprocess",
"https://docs.python.org/3/library/subprocess.html",
"https://docs.python.org/3/library/shlex.html",
"https://semgrep.dev/docs/cheat-sheets/python-command-injection/"
],
"category": "security",
"technology": [
"python"
],
"confidence": "MEDIUM",
"cwe2022-top25": true,
"cwe2021-top25": true,
"subcategory": [
"vuln"
],
"likelihood": "MEDIUM",
"impact": "MEDIUM",
"license": "Commons Clause License Condition v1.0[LGPL-2.1-only]",
"vulnerability_class": [
"Command Injection"
],
"source": "https://semgrep.dev/r/python.lang.security.audit.dangerous-subprocess-use-tainted-env-args.dangerous-subprocess-use-tainted-env-args",
"shortlink": "https://sg.run/pLGg",
"semgrep.dev": {
"rule": {
"origin": "community",
"r_id": 27262,
"rule_id": "AbUgrZ",
"rv_id": 928299,
"url": "https://semgrep.dev/playground/r/0bTpAQn/python.lang.security.audit.dangerous-subprocess-use-tainted-env-args.dangerous-subprocess-use-tainted-env-args",
"version_id": "0bTpAQn"
}
}
},
"severity": "ERROR",
"fingerprint": "1d0cfd65fc17a97773e848ec31ddd8a580c5fb54914f03bebe4f934a16d5d45f22cfd3ab1b9f3ab9871da06569701e38ea2b518d2a1aa397cd5d7ecbd699d4a4_0",
"lines": " result = subprocess.run([program, username, password], stdout=subprocess.DEVNULL)",
"is_ignored": false,
"dataflow_trace": {
"taint_source": [
"CliLoc",
[
{
"path": "sample/brute.py",
"start": {
"line": 6,
"col": 11,
"offset": 64
},
"end": {
"line": 6,
"col": 19,
"offset": 72
}
},
"sys.argv"
]
],
"intermediate_vars": [
{
"location": {
"path": "sample/brute.py",
"start": {
"line": 6,
"col": 1,
"offset": 54
},
"end": {
"line": 6,
"col": 8,
"offset": 61
}
},
"content": "program"
},
{
"location": {
"path": "sample/brute.py",
"start": {
"line": 6,
"col": 1,
"offset": 54
},
"end": {
"line": 6,
"col": 8,
"offset": 61
}
},
"content": "program"
}
],
"taint_sink": [
"CliLoc",
[
{
"path": "sample/brute.py",
"start": {
"line": 31,
"col": 29,
"offset": 285
},
"end": {
"line": 31,
"col": 58,
"offset": 314
}
},
"[program, username, password]"
]
]
},
"engine_kind": "OSS",
"validation_state": "NO_VALIDATOR"
}
}
],
"errors": [],
"paths": {
"scanned": [
"README.md",
"sample/brute.py",
"sample/findmysecrets.go",
"sample/function.go",
"sample/go.mod",
"sample/session.go",
"sample/sqli.go"
]
},
"interfile_languages_used": [],
"skipped_rules": []
}
Loading