diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index da20e0e1931..42feb3a50e4 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -164,6 +164,7 @@ from dojo.user.utils import get_configuration_permissions_codenames from dojo.utils import ( async_delete, + generate_file_response, get_setting, get_system_setting, ) @@ -646,21 +647,8 @@ def download_file(self, request, file_id, pk=None): {"error": "File ID not associated with Engagement"}, status=status.HTTP_404_NOT_FOUND, ) - # Get the path of the file in media root - file_path = f"{settings.MEDIA_ROOT}/{file_object.file.url.lstrip(settings.MEDIA_URL)}" - file_handle = open(file_path, "rb") # send file - response = FileResponse( - file_handle, - content_type=f"{mimetypes.guess_type(file_path)}", - status=status.HTTP_200_OK, - ) - response["Content-Length"] = file_object.file.size - response[ - "Content-Disposition" - ] = f'attachment; filename="{file_object.file.name}"' - - return response + return generate_file_response(file_object) class RiskAcceptanceViewSet( @@ -1156,21 +1144,8 @@ def download_file(self, request, file_id, pk=None): {"error": "File ID not associated with Finding"}, status=status.HTTP_404_NOT_FOUND, ) - # Get the path of the file in media root - file_path = f"{settings.MEDIA_ROOT}/{file_object.file.url.lstrip(settings.MEDIA_URL)}" - file_handle = open(file_path, "rb") # send file - response = FileResponse( - file_handle, - content_type=f"{mimetypes.guess_type(file_path)}", - status=status.HTTP_200_OK, - ) - response["Content-Length"] = file_object.file.size - response[ - "Content-Disposition" - ] = f'attachment; filename="{file_object.file.name}"' - - return response + return generate_file_response(file_object) @extend_schema( request=serializers.FindingNoteSerializer, @@ -2320,21 +2295,8 @@ def download_file(self, request, file_id, pk=None): {"error": "File ID not associated with Test"}, status=status.HTTP_404_NOT_FOUND, ) - # Get the path of the file in media root - file_path = f"{settings.MEDIA_ROOT}/{file_object.file.url.lstrip(settings.MEDIA_URL)}" - file_handle = open(file_path, "rb") # send file - response = FileResponse( - file_handle, - content_type=f"{mimetypes.guess_type(file_path)}", - status=status.HTTP_200_OK, - ) - response["Content-Length"] = file_object.file.size - response[ - "Content-Disposition" - ] = f'attachment; filename="{file_object.file.name}"' - - return response + return generate_file_response(file_object) # Authorization: authenticated, configuration diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index f03d28dde34..6eb4d7cf876 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -1,5 +1,6 @@ import csv import logging +import mimetypes import operator import re from datetime import datetime @@ -1481,12 +1482,11 @@ def delete_risk_acceptance(request, eid, raid): @user_is_authorized(Engagement, Permissions.Engagement_View, 'eid') def download_risk_acceptance(request, eid, raid): - import mimetypes - mimetypes.init() - risk_acceptance = get_object_or_404(Risk_Acceptance, pk=raid) - + # Ensure the risk acceptance is under the supplied engagement + if not Engagement.objects.filter(risk_acceptance=risk_acceptance, id=eid).exists(): + raise PermissionDenied response = StreamingHttpResponse( FileIterWrapper( open(settings.MEDIA_ROOT + "/" + risk_acceptance.path.name, mode='rb'))) diff --git a/dojo/utils.py b/dojo/utils.py index 6c0b16bbdfa..24866a88fc8 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -26,7 +26,7 @@ from django.db.models.query import QuerySet from django.db.models.signals import post_save from django.dispatch import receiver -from django.http import HttpResponseRedirect +from django.http import FileResponse, HttpResponseRedirect from django.urls import get_resolver, get_script_prefix, reverse from django.utils import timezone from django.utils.translation import gettext as _ @@ -48,6 +48,7 @@ Dojo_User, Endpoint, Engagement, + FileUpload, Finding, Finding_Group, Finding_Template, @@ -2588,3 +2589,28 @@ def get_open_findings_burndown(product): past_90_days['y_min'] = running_min return past_90_days + + +def generate_file_response(file_object: FileUpload) -> FileResponse: + """Serve an uploaded file in a uniformed way. + + This function assumes all permissions have previously validated/verified + by the caller of this function. + """ + # Quick check to ensure we have the right type of object + if not isinstance(file_object, FileUpload): + msg = f"FileUpload object expected but type <{type(file_object)}> received." + raise TypeError(msg) + # Determine the path of the file on disk within the MEDIA_ROOT + file_path = f'{settings.MEDIA_ROOT}/{file_object.file.url.lstrip(settings.MEDIA_URL)}' + _, file_extension = os.path.splitext(file_path) + # Generate the FileResponse + response = FileResponse( + open(file_path, "rb"), + filename=f"{file_object.title}{file_extension}", + content_type=f"{mimetypes.guess_type(file_path)}", + ) + # Add some important headers + response["Content-Disposition"] = f'attachment; filename="{file_object.title}{file_extension}"' + response["Content-Length"] = file_object.file.size + return response diff --git a/dojo/views.py b/dojo/views.py index 09a0dcad73e..c59619621b9 100644 --- a/dojo/views.py +++ b/dojo/views.py @@ -7,10 +7,9 @@ from django.contrib.auth.decorators import login_required from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist, PermissionDenied -from django.http import FileResponse, Http404, HttpResponseRedirect +from django.http import Http404, HttpResponseRedirect from django.shortcuts import get_object_or_404, render from django.urls import reverse -from django.views.static import serve from dojo.authorization.authorization import ( user_has_configuration_permission_or_403, @@ -21,7 +20,7 @@ from dojo.filters import LogEntryFilter from dojo.forms import ManageFileFormSet from dojo.models import Endpoint, Engagement, FileUpload, Finding, Product, Test -from dojo.utils import Product_Tab, get_page_items +from dojo.utils import Product_Tab, generate_file_response, get_page_items logger = logging.getLogger(__name__) @@ -189,13 +188,16 @@ def manage_files(request, oid, obj_type): }) -# Serve the file only after verifying the user is supposed to see the file @login_required def protected_serve(request, path, document_root=None, show_indexes=False): + """Serve the file only after verifying the user is supposed to see the file.""" file = FileUpload.objects.get(file=path) if not file: raise Http404 object_set = list(file.engagement_set.all()) + list(file.test_set.all()) + list(file.finding_set.all()) + # Determine if there is an object to query permission checks from + if len(object_set) == 0: + raise Http404 # Should only one item (but not sure what type) in the list, so O(n=1) for obj in object_set: if isinstance(obj, Engagement): @@ -204,23 +206,30 @@ def protected_serve(request, path, document_root=None, show_indexes=False): user_has_permission_or_403(request.user, obj, Permissions.Test_View) elif isinstance(obj, Finding): user_has_permission_or_403(request.user, obj, Permissions.Finding_View) - return serve(request, path, document_root, show_indexes) + + return generate_file_response(file) def access_file(request, fid, oid, obj_type, url=False): + def check_file_belongs_to_object(file, object_manager, object_id): + if not object_manager.filter(id=object_id).exists(): + raise PermissionDenied + + file = get_object_or_404(FileUpload, pk=fid) if obj_type == 'Engagement': obj = get_object_or_404(Engagement, pk=oid) user_has_permission_or_403(request.user, obj, Permissions.Engagement_View) + obj_manager = file.engagement_set elif obj_type == 'Test': obj = get_object_or_404(Test, pk=oid) user_has_permission_or_403(request.user, obj, Permissions.Test_View) + obj_manager = file.test_set elif obj_type == 'Finding': obj = get_object_or_404(Finding, pk=oid) user_has_permission_or_403(request.user, obj, Permissions.Finding_View) + obj_manager = file.finding_set else: raise Http404 - # If reaching this far, user must have permission to get file - file = get_object_or_404(FileUpload, pk=fid) - redirect_url = f'{settings.MEDIA_ROOT}/{file.file.url.lstrip(settings.MEDIA_URL)}' - print(redirect_url) - return FileResponse(open(redirect_url, "rb")) + check_file_belongs_to_object(file, obj_manager, obj.id) + + return generate_file_response(file)