Skip to content

Commit

Permalink
Duplicate Delete errors: catch IntegrityErrors (A) (#11739)
Browse files Browse the repository at this point in the history
* option a: catch IntegrityErrors

* linting

* fix object creation

* fix object creation

* bugfixing

* bugfixing

* Update dojo/api_v2/serializers.py
  • Loading branch information
valentijnscholten authored Feb 13, 2025
1 parent 7a7e4d8 commit b483752
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dojo/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class ImportStatisticsSerializer(serializers.Serializer):
)
delta = DeltaStatisticsSerializer(
required=False,
help_text="Finding statistics of modifications made by the reimport. Only available when TRACK_IMPORT_HISTORY hass not disabled.",
help_text="Finding statistics of modifications made by the reimport. Only available when TRACK_IMPORT_HISTORY has not been disabled.",
)
after = SeverityStatusStatisticsSerializer(
help_text="Finding statistics as stored in Defect Dojo after the import",
Expand Down
56 changes: 42 additions & 14 deletions dojo/importers/base_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.core.exceptions import ValidationError
from django.core.files.base import ContentFile
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.db import IntegrityError
from django.urls import reverse
from django.utils.timezone import make_aware

Expand Down Expand Up @@ -357,53 +358,80 @@ def update_import_history(
commit_hash=self.commit_hash,
type=self.import_type,
)
# Define all of the respective import finding actions for the test import object
test_import_finding_action_list = []

# Create a history record for each finding
for finding in closed_findings:
logger.debug(f"preparing Test_Import_Finding_Action for closed finding: {finding.id}")
test_import_finding_action_list.append(Test_Import_Finding_Action(
self.create_import_history_record_safe(Test_Import_Finding_Action(
test_import=test_import,
finding=finding,
action=IMPORT_CLOSED_FINDING,
))
for finding in new_findings:
logger.debug(f"preparing Test_Import_Finding_Action for created finding: {finding.id}")
test_import_finding_action_list.append(Test_Import_Finding_Action(
self.create_import_history_record_safe(Test_Import_Finding_Action(
test_import=test_import,
finding=finding,
action=IMPORT_CREATED_FINDING,
))
for finding in reactivated_findings:
logger.debug(f"preparing Test_Import_Finding_Action for reactivated finding: {finding.id}")
test_import_finding_action_list.append(Test_Import_Finding_Action(
self.create_import_history_record_safe(Test_Import_Finding_Action(
test_import=test_import,
finding=finding,
action=IMPORT_REACTIVATED_FINDING,
))
for finding in untouched_findings:
logger.debug(f"preparing Test_Import_Finding_Action for untouched finding: {finding.id}")
test_import_finding_action_list.append(Test_Import_Finding_Action(
self.create_import_history_record_safe(Test_Import_Finding_Action(
test_import=test_import,
finding=finding,
action=IMPORT_UNTOUCHED_FINDING,
))
# Bulk create all the defined objects
Test_Import_Finding_Action.objects.bulk_create(test_import_finding_action_list)

# Add any tags to the findings imported if necessary
if self.apply_tags_to_findings and self.tags:
for finding in test_import.findings_affected.all():
for tag in self.tags:
finding.tags.add(tag)
self.add_tags_safe(finding, tag)
# Add any tags to any endpoints of the findings imported if necessary
if self.apply_tags_to_endpoints and self.tags:
for finding in test_import.findings_affected.all():
for endpoint in finding.endpoints.all():
for tag in self.tags:
endpoint.tags.add(tag)
self.add_tags_safe(endpoint, tag)

return test_import

def create_import_history_record_safe(
self,
test_import_finding_action,
):
"""Creates an import history record, while catching any IntegrityErrors that might happen because of the background job having deleted a finding"""
logger.debug(f"creating Test_Import_Finding_Action for finding: {test_import_finding_action.finding.id} action: {test_import_finding_action.action}")
try:
test_import_finding_action.save()
except IntegrityError as e:
# This try catch makes us look we don't know what we're doing, but in https://github.com/DefectDojo/django-DefectDojo/issues/6217 we decided that for now this is the best solution
logger.warning("Error creating Test_Import_Finding_Action: %s", e)
logger.debug("Error creating Test_Import_Finding_Action, finding marked as duplicate and deleted ?")

def add_tags_safe(
self,
finding_or_endpoint,
tag,
):
"""Adds tags to a finding or endpoint, while catching any IntegrityErrors that might happen because of the background job having deleted a finding"""
if not isinstance(finding_or_endpoint, Finding) and not isinstance(finding_or_endpoint, Endpoint):
msg = "finding_or_endpoint must be a Finding or Endpoint object"
raise TypeError(msg)

msg = "finding" if isinstance(finding_or_endpoint, Finding) else "endpoint" if isinstance(finding_or_endpoint, Endpoint) else "unknown"
logger.debug(f" adding tag: {tag} to " + msg + f"{finding_or_endpoint.id}")

try:
finding_or_endpoint.tags.add(tag)
except IntegrityError as e:
# This try catch makes us look we don't know what we're doing, but in https://github.com/DefectDojo/django-DefectDojo/issues/6217 we decided that for now this is the best solution
logger.warning("Error adding tag: %s", e)
logger.debug("Error adding tag, finding marked as duplicate and deleted ?")

def construct_imported_message(
self,
finding_count: int = 0,
Expand Down

0 comments on commit b483752

Please sign in to comment.