Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2229659] Improve scan type logic by based on specific conditions. #113

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/resc-vcs-scanner/src/vcs_scanner/output_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from resc_backend.resc_web_service.schema.branch import Branch
from resc_backend.resc_web_service.schema.finding import FindingCreate
from resc_backend.resc_web_service.schema.repository import Repository
from resc_backend.resc_web_service.schema.scan import Scan
from resc_backend.resc_web_service.schema.scan import Scan, ScanRead
from resc_backend.resc_web_service.schema.scan_type import ScanType

# First Party
Expand Down Expand Up @@ -39,5 +39,5 @@ def write_scan(
rule_pack: str) -> Scan:
pass

def get_last_scanned_commit(self, branch: Branch):
def get_last_scan_for_branch(self, branch: Branch) -> ScanRead:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ def write_scan(

return created_scan

def get_last_scanned_commit(self, branch: BranchRead):
last_scanned_commit = None
def get_last_scan_for_branch(self, branch: BranchRead) -> ScanRead:
response = get_last_scan_for_branch(self.rws_url,
branch.id_)
if response.status_code == 200:
json_body = json.loads(response.text)
last_scanned_commit = json_body['last_scanned_commit'] if json_body else None
else:
logger.warning(f"Retrieving last scan details failed with error: {response.status_code}->{response.text}")
return last_scanned_commit
return ScanRead(**json.loads(response.text))
logger.warning(f"Retrieving last scan details failed with error: {response.status_code}->{response.text}")
return None

@retry(wait=wait_exponential(multiplier=1, min=2, max=10), stop=stop_after_attempt(100))
def write_vcs_instances(self, vcs_instances_dict: Dict[str, VCSInstanceRuntime]) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,11 @@ def run_repository_scan(self) -> None:
return

# Get last scanned commit for the branch
last_scanned_commit = self._output_module.get_last_scanned_commit(branch=created_branch)
last_scan_for_branch = self._output_module.get_last_scan_for_branch(branch=created_branch)
last_scanned_commit = last_scan_for_branch.last_scanned_commit if last_scan_for_branch else None
scan_type_to_run = self.determine_scan_type(branch, last_scan_for_branch)

# Decide which type of scan to run
if self.force_base_scan:
scan_type_to_run = ScanType.BASE
else:
scan_type_to_run = ScanType.INCREMENTAL if last_scanned_commit else ScanType.BASE

# Only insert in to scan and finding table if its BASE Scan or there is new commit, else skip
if scan_type_to_run == ScanType.BASE or last_scanned_commit != branch.latest_commit:
if scan_type_to_run:
# Insert in to scan table
scan_timestamp_start = datetime.utcnow()
created_scan = self._output_module.write_scan(
Expand Down Expand Up @@ -251,3 +246,21 @@ def scan_directory(self, directory_path: str) -> Optional[List[FindingBase]]:
if os.path.exists(report_filepath):
os.remove(report_filepath)
return None

# Decide which type of scan to run
def determine_scan_type(self, branch, last_scan_for_branch):
# Force base scan, or has no previous scan
if self.force_base_scan or last_scan_for_branch is None:
return ScanType.BASE
# Has previous scan
if last_scan_for_branch:
last_used_rule_pack = last_scan_for_branch.rule_pack
# Rule-pack is different from previous scan
if last_used_rule_pack != self.rule_pack_version:
return ScanType.BASE
last_scanned_commit = last_scan_for_branch.last_scanned_commit
# Last commit is different from previous scan
if branch and branch.latest_commit != last_scanned_commit:
return ScanType.INCREMENTAL
# Skip scanning, no conditions match
return None
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,5 @@ def write_scan(self, scan_type_to_run: ScanType, last_scanned_commit: str, scan_
id_=1,
rule_pack=rule_pack)

def get_last_scanned_commit(self, branch: Branch):
def get_last_scan_for_branch(self, branch: Branch) -> ScanRead:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_write_scan_unsuccessful(warning, post):


@patch("requests.get")
def test_get_last_scanned_commit(get):
def test_get_last_scan_for_branch(get):
url = "https://nonexistingwebsite.com"
branch = BranchRead(id_=1,
branch_id="branch.branch_id",
Expand All @@ -255,8 +255,8 @@ def test_get_last_scanned_commit(get):
get.return_value.status_code = 200
get.return_value.text = expected_json

result = RESTAPIWriter(rws_url=url).get_last_scanned_commit(branch)
assert result == expected_result.last_scanned_commit
result = RESTAPIWriter(rws_url=url).get_last_scan_for_branch(branch)
assert result == expected_result


@patch("requests.get")
Expand All @@ -272,7 +272,7 @@ def test_get_last_scanned_commit_invalid_id(warning, get):
get.return_value.status_code = 404
get.return_value.text = error_text

result = RESTAPIWriter(rws_url=url).get_last_scanned_commit(branch)
result = RESTAPIWriter(rws_url=url).get_last_scan_for_branch(branch)
assert result is None
warning.assert_called_once()
warning.assert_called_with(f"Retrieving last scan details failed with error: 404->{error_text}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Standard Library
import sys
from datetime import datetime
from unittest.mock import patch

# Third Party
from _pytest.monkeypatch import MonkeyPatch
from resc_backend.resc_web_service.schema.branch import Branch
from resc_backend.resc_web_service.schema.repository import Repository
from resc_backend.resc_web_service.schema.scan import ScanRead
from resc_backend.resc_web_service.schema.scan_type import ScanType

# First Party
Expand Down Expand Up @@ -127,3 +129,80 @@ def test_scan_directory(start_scan):
result = secret_scanner.scan_directory(directory_path=repo_clone_path)
assert result is None
start_scan.assert_called_once()


# not a test class
def initialize_and_get_repo_scanner_and_branch():
repository = Repository(project_key="local",
repository_id=1,
repository_name="local",
repository_url="https://repository.url",
vcs_instance=1,
branches=[])

secret_scanner = SecretScanner(
gitleaks_binary_path="/tmp/gitleaks",
gitleaks_rules_path="/rules.toml",
rule_pack_version="2.0.1",
output_plugin=RESTAPIWriter(rws_url="https://fakeurl.com:8000"),
repository=repository,
username="",
personal_access_token="")

branch = Branch(branch_id=1,
branch_name="branch_name1",
latest_commit="latest_commit_2")

return repository, branch, secret_scanner


def test_scan_type_is_base_when_a_latest_scan_is_not_present():
repository, branch, secret_scanner = initialize_and_get_repo_scanner_and_branch()

scan_type = secret_scanner.determine_scan_type(branch, None)
assert scan_type == ScanType.BASE


def test_scan_type_is_base_when_a_latest_scan_is_present_and_rule_pack_is_latest():
repository, branch, secret_scanner = initialize_and_get_repo_scanner_and_branch()

scan_read = ScanRead(id_=1,
branch_id=1,
scan_type=ScanType.BASE,
last_scanned_commit="latest_commit_1",
timestamp=datetime.utcnow(),
increment_number=0,
rule_pack="2.0.2")

scan_type = secret_scanner.determine_scan_type(branch, scan_read)
assert scan_type == ScanType.BASE


def test_scan_type_is_incremental_when_a_latest_scan_is_present_and_rule_pack_is_same():
repository, branch, secret_scanner = initialize_and_get_repo_scanner_and_branch()

scan_read = ScanRead(id_=1,
branch_id=1,
scan_type=ScanType.BASE,
last_scanned_commit="latest_commit_1",
timestamp=datetime.utcnow(),
increment_number=0,
rule_pack="2.0.1")

scan_type = secret_scanner.determine_scan_type(branch, scan_read)
assert scan_type == ScanType.INCREMENTAL


def test_scan_type_is_incremental_when_a_latest_scan_is_present_and_rule_pack_is_same_and_last_commit_is_newer():
repository, branch, secret_scanner = initialize_and_get_repo_scanner_and_branch()

scan_read = ScanRead(id_=1,
branch_id=1,
scan_type=ScanType.BASE,
last_scanned_commit="latest_commit_1",
timestamp=datetime.utcnow(),
increment_number=0,
rule_pack="2.0.1")

scan_type = secret_scanner.determine_scan_type(branch, scan_read)
assert scan_type == ScanType.INCREMENTAL
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_get_last_scanned_commit():
repository_id=1)

result = STDOUTWriter(toml_rule_file_path="toml_path", exit_code_warn=2, exit_code_block=1) \
.get_last_scanned_commit(branch)
.get_last_scan_for_branch(branch)
assert result is None


Expand Down
Binary file modified images/RESC_Scan_Flow_Diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.