Skip to content

Allow specification of custom default file for finding-cfgs #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions local-setup/kind/cluster/values-bootstrapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ extensions_cfg:

findings:
- type: finding/vulnerability
categorisations: gardener
rescoring_ruleset: gardener
categorisations:
cfg_name: gardener
ref:
path: odg/defaults.yaml
rescoring_ruleset:
cfg_name: gardener
ref:
path: odg/defaults.yaml
issues:
enable_assignees: False
- type: finding/license
Expand Down
44 changes: 27 additions & 17 deletions odg/findings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import dataclasses
import enum
import os
import re
import typing

Expand All @@ -10,14 +9,11 @@
import dso.model

import consts
import odg.shared_cfg
import rescore.model as rm
import util


own_dir = os.path.abspath(os.path.dirname(__file__))
defaults_file_path = os.path.join(own_dir, 'defaults.yaml')


class ModelValidationError(ValueError):
pass

Expand Down Expand Up @@ -285,6 +281,16 @@ def match_regexes(patterns: list[str], string: str) -> bool:
return True


@dataclasses.dataclass
class SharedCfgReference:
cfg_name: str
ref: (
odg.shared_cfg.SharedCfgGitHubReference
| odg.shared_cfg.SharedCfgLocalReference
| odg.shared_cfg.SharedCfgOCMReference
)


@dataclasses.dataclass
class Finding:
'''
Expand All @@ -305,9 +311,9 @@ class Finding:
Default scope selection to be used for rescoring via the Delivery-Dashboard.
'''
type: FindingType
categorisations: list[FindingCategorisation] | str
categorisations: SharedCfgReference | list[FindingCategorisation]
filter: list[FindingFilter] | None
rescoring_ruleset: dict | str | None
rescoring_ruleset: SharedCfgReference | dict | None
issues: FindingIssues = dataclasses.field(default_factory=FindingIssues)
default_scope: RescoringSpecificity = dataclasses.field(
default_factory=lambda: RescoringSpecificity.ARTEFACT
Expand Down Expand Up @@ -357,16 +363,24 @@ def from_file(
)

def __post_init__(self):
if isinstance(self.categorisations, str):
shared_cfg_lookup = odg.shared_cfg.shared_cfg_lookup()

if isinstance(self.categorisations, SharedCfgReference):
default_cfg = shared_cfg_lookup(self.categorisations.ref)

self.categorisations = default_finding_categorisations(
categorisations_raw=default_cfg.get('categorisations', []),
finding_type=self.type,
name=self.categorisations,
name=self.categorisations.cfg_name,
)

if isinstance(self.rescoring_ruleset, str):
if isinstance(self.rescoring_ruleset, SharedCfgReference):
default_cfg = shared_cfg_lookup(self.rescoring_ruleset.ref)

self.rescoring_ruleset = default_rescoring_ruleset(
rescoring_rulesets_raw=default_cfg.get('rescoring_rulesets', []),
finding_type=self.type,
name=self.rescoring_ruleset,
name=self.rescoring_ruleset.cfg_name,
)

if isinstance(self.rescoring_ruleset, dict):
Expand Down Expand Up @@ -589,12 +603,10 @@ def matches(self, artefact: dso.model.ComponentArtefactId) -> bool:


def default_finding_categorisations(
categorisations_raw: list[dict],
finding_type: FindingType,
name: str,
) -> list[FindingCategorisation]:
with open(defaults_file_path) as file:
categorisations_raw = yaml.safe_load(file).get('categorisations', [])

for categorisation_raw in categorisations_raw:
if FindingType(categorisation_raw['type']) is finding_type:
break
Expand All @@ -616,12 +628,10 @@ def default_finding_categorisations(


def default_rescoring_ruleset(
rescoring_rulesets_raw: list[dict],
finding_type: FindingType,
name: str,
) -> dict:
with open(defaults_file_path) as file:
rescoring_rulesets_raw = yaml.safe_load(file).get('rescoring_rulesets', [])

for rescoring_ruleset_raw in rescoring_rulesets_raw:
if FindingType(rescoring_ruleset_raw['type']) is finding_type:
break
Expand Down
10 changes: 8 additions & 2 deletions odg/findings_cfg.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
- type: finding/vulnerability
issues:
enable_assignees: False
rescoring_ruleset: gardener
categorisations: gardener
rescoring_ruleset:
cfg_name: gardener
ref:
path: odg/defaults.yaml
categorisations:
cfg_name: gardener
ref:
path: odg/defaults.yaml

- type: finding/license
issues:
Expand Down
178 changes: 178 additions & 0 deletions odg/shared_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import collections.abc
import dataclasses
import enum
import io
import os

import github3.exceptions
import github3.repos
import yaml

import cnudie.retrieve
import oci.client
import ocm

import ctx_util
import lookups
import secret_mgmt


own_dir = os.path.abspath(os.path.dirname(__file__))
root_dir = os.path.abspath(os.path.join(own_dir, os.pardir))


class ReferenceType(enum.StrEnum):
GITHUB = 'github'
LOCAL = 'local'
OCM = 'ocm'


@dataclasses.dataclass
class SharedCfgReference:
type: ReferenceType


@dataclasses.dataclass(kw_only=True)
class SharedCfgGitHubReference(SharedCfgReference):
type: ReferenceType = ReferenceType.GITHUB
repository: str
path: str


@dataclasses.dataclass(kw_only=True)
class SharedCfgLocalReference(SharedCfgReference):
type: ReferenceType = ReferenceType.LOCAL
path: str


@dataclasses.dataclass(kw_only=True)
class SharedCfgOCMReference(SharedCfgReference):
type: ReferenceType = ReferenceType.OCM
component_name: str
component_version: str
artefact_name: str
artefact_version: str | None
artefact_extra_id: dict | None
ocm_repo_url: str | None

@property
def component_id(self) -> ocm.ComponentIdentity:
return ocm.ComponentIdentity(
name=self.component_name,
version=self.component_version,
)

@property
def ocm_repo(self) -> ocm.OciOcmRepository | None:
if not self.ocm_repo_url:
return None

return ocm.OciOcmRepository(baseUrl=self.ocm_repo_url)


def shared_cfg_lookup(
secret_factory: secret_mgmt.SecretFactory | None=None,
github_repo_lookup: collections.abc.Callable[[str], github3.repos.Repository] | None=None,
oci_client: oci.client.Client | None=None,
component_descriptor_lookup: cnudie.retrieve.ComponentDescriptorLookupById | None=None,
) -> collections.abc.Callable[[SharedCfgReference], dict]:
'''
Creates a shared-cfg-lookup. Ideally, this lookup should be created at application launch, and
passed to consumers.
'''
if not secret_factory:
secret_factory = ctx_util.secret_factory()

if not github_repo_lookup:
github_api_lookup = lookups.github_api_lookup(secret_factory)
github_repo_lookup = lookups.github_repo_lookup(github_api_lookup)

if not oci_client:
oci_client = lookups.semver_sanitising_oci_client(secret_factory)

if not component_descriptor_lookup:
component_descriptor_lookup = lookups.init_component_descriptor_lookup()

def retrieve_github_ref(
github_ref: SharedCfgGitHubReference,
) -> dict:
repo = github_repo_lookup(github_ref.repository)

try:
file_contents = repo.file_contents(github_ref.path).decoded.decode()
except github3.exceptions.NotFoundError as e:
e.add_note(f'did not find default cfg file for {github_ref=}')
raise

return yaml.safe_load(file_contents)

def retrieve_local_ref(
local_ref: SharedCfgLocalReference,
) -> dict:
with open(os.path.join(root_dir, local_ref.path)) as file:
return yaml.safe_load(file)

def retrieve_ocm_ref(
ocm_ref: SharedCfgOCMReference,
) -> dict:
if ocm_ref.ocm_repo:
component = component_descriptor_lookup(
ocm_ref.component_id,
ocm_repository_lookup=cnudie.retrieve.ocm_repository_lookup(ocm_ref.ocm_repo),
).component
else:
component = component_descriptor_lookup(ocm_ref.component_id).component

def matches(artefact: ocm.Artifact) -> bool:
if ocm_ref.artefact_name != artefact.name:
return False
if ocm_ref.artefact_version and ocm_ref.artefact_version != artefact.version:
return False

if ocm_ref.artefact_extra_id:
for key, value in ocm_ref.artefact_extra_id.items():
if artefact.extraIdentity.get(key) != value:
return False

return True

for artefact in component.iter_artefacts():
if matches(artefact=artefact):
break
else:
raise ValueError(f'did not find requested OCM artefact for {ocm_ref=}')

access = artefact.access

if not isinstance(access, ocm.LocalBlobAccess):
raise TypeError(f'{artefact.name=} has {access.type=} only localBlobAccess is supported')

digest = access.globalAccess.digest if access.globalAccess else access.localReference

blob = oci_client.blob(
image_reference=component.current_ocm_repo.component_oci_ref(component),
digest=digest,
stream=False, # cfg-files are typically small, do not bother with streaming
)

return yaml.safe_load(io.BytesIO(blob.content))

def shared_cfg_lookup(
shared_cfg_reference: SharedCfgReference,
/,
) -> dict:
if shared_cfg_reference.type is ReferenceType.GITHUB:
shared_cfg = retrieve_github_ref(shared_cfg_reference)

elif shared_cfg_reference.type is ReferenceType.LOCAL:
shared_cfg = retrieve_local_ref(shared_cfg_reference)

elif shared_cfg_reference.type is ReferenceType.OCM:
shared_cfg = retrieve_ocm_ref(shared_cfg_reference)

else:
raise ValueError(f'unsupported {shared_cfg_reference.type=}')

return shared_cfg

return shared_cfg_lookup