Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT: backward compatibility - check that cursor fields were not changed #15520

Merged
merged 7 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.0
Finish backward compatibility syntactic test implementation: check that streams cursor field was not changed. [#TBD](https://github.com/airbytehq/airbyte/pull/TBD/)

## 0.1.62
Backward compatibility tests: add syntactic validation of catalogs [#15486](https://github.com/airbytehq/airbyte/pull/15486/)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
TraceType,
Type,
)
from deepdiff import DeepDiff
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
Expand All @@ -46,15 +45,6 @@ class TestSpec(BaseTest):
spec_cache: ConnectorSpecification = None
previous_spec_cache: ConnectorSpecification = None

@staticmethod
def compute_spec_diff(actual_connector_spec: ConnectorSpecification, previous_connector_spec: ConnectorSpecification):
return DeepDiff(
previous_connector_spec.dict()["connectionSpecification"],
actual_connector_spec.dict()["connectionSpecification"],
view="tree",
ignore_order=True,
)

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(self, inputs: SpecTestConfig, previous_connector_docker_runner: ConnectorRunner) -> bool:
if previous_connector_docker_runner is None:
Expand Down Expand Up @@ -185,13 +175,9 @@ def test_backward_compatibility(
previous_connector_spec: ConnectorSpecification,
number_of_configs_to_generate: int = 100,
):
"""Check if the current spec is backward_compatible:
1. Perform multiple hardcoded syntactic checks with SpecDiffChecker.
2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs.
"""
"""Check if the current spec is backward_compatible with the previous one"""
assert isinstance(actual_connector_spec, ConnectorSpecification) and isinstance(previous_connector_spec, ConnectorSpecification)
spec_diff = self.compute_spec_diff(actual_connector_spec, previous_connector_spec)
checker = SpecDiffChecker(spec_diff)
checker = SpecDiffChecker(previous=previous_connector_spec.dict(), current=actual_connector_spec.dict())
checker.assert_is_backward_compatible()
validate_previous_configs(previous_connector_spec, actual_connector_spec, number_of_configs_to_generate)

Expand Down Expand Up @@ -235,17 +221,6 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn

@pytest.mark.default_timeout(30)
class TestDiscovery(BaseTest):
@staticmethod
def compute_discovered_catalog_diff(
discovered_catalog: MutableMapping[str, AirbyteStream], previous_discovered_catalog: MutableMapping[str, AirbyteStream]
):
return DeepDiff(
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in previous_discovered_catalog.items()},
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in discovered_catalog.items()},
view="tree",
ignore_order=True,
)

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(
self, inputs: DiscoveryTestConfig, previous_connector_docker_runner: ConnectorRunner
Expand Down Expand Up @@ -340,13 +315,9 @@ def test_backward_compatibility(
discovered_catalog: MutableMapping[str, AirbyteStream],
previous_discovered_catalog: MutableMapping[str, AirbyteStream],
):
"""Check if the current spec is backward_compatible:
1. Perform multiple hardcoded syntactic checks with SpecDiffChecker.
2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs.
"""
"""Check if the current catalog is backward_compatible with the previous one."""
assert isinstance(discovered_catalog, MutableMapping) and isinstance(previous_discovered_catalog, MutableMapping)
catalog_diff = self.compute_discovered_catalog_diff(discovered_catalog, previous_discovered_catalog)
checker = CatalogDiffChecker(catalog_diff)
checker = CatalogDiffChecker(previous_discovered_catalog, discovered_catalog)
checker.assert_is_backward_compatible()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from abc import ABC, abstractmethod
from multiprocessing import context
from enum import Enum

import jsonschema
from airbyte_cdk.models import ConnectorSpecification
Expand All @@ -13,50 +13,67 @@
from source_acceptance_test.utils import SecretDict


class BackwardIncompatibilityContext(Enum):
SPEC = 1
DISCOVER = 2


class NonBackwardCompatibleError(Exception):
pass
def __init__(self, error_message: str, context: BackwardIncompatibilityContext) -> None:
self.error_message = error_message
self.context = context
super().__init__(error_message)

def __str__(self):
return f"{self.context} - {self.error_message}"


class BaseDiffChecker(ABC):
def __init__(self, diff: DeepDiff) -> None:
self._diff = diff
def __init__(self, previous: dict, current: dict) -> None:
self._previous = previous
self._current = current
self.compute_diffs()

def _raise_error(self, message: str):
raise NonBackwardCompatibleError(f"{context} - {message}. Diff: {self._diff.pretty()}")
def _raise_error(self, message: str, diff: DeepDiff):
raise NonBackwardCompatibleError(f"{message}. Diff: {diff.pretty()}", self.context)

@property
@abstractmethod
def context(self): # pragma: no cover
pass

@abstractmethod
def compute_diffs(self): # pragma: no cover
pass

@abstractmethod
def assert_is_backward_compatible(self): # pragma: no cover
pass

def check_if_value_of_type_field_changed(self):
def check_if_value_of_type_field_changed(self, diff: DeepDiff):
"""Check if a type was changed"""
# Detect type value change in case type field is declared as a string (e.g "str" -> "int"):
type_values_changed = [change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-1] == "type"]
type_values_changed = [change for change in diff.get("values_changed", []) if change.path(output_format="list")[-1] == "type"]

# Detect type value change in case type field is declared as a single item list (e.g ["str"] -> ["int"]):
type_values_changed_in_list = [
change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type"
change for change in diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type"
]
if type_values_changed or type_values_changed_in_list:
self._raise_error("The'type' field value was changed.")
self._raise_error("The'type' field value was changed.", diff)

def check_if_new_type_was_added(self): # pragma: no cover
def check_if_new_type_was_added(self, diff: DeepDiff): # pragma: no cover
"""Detect type value added to type list if new type value is not None (e.g ["str"] -> ["str", "int"])"""
new_values_in_type_list = [
change
for change in self._diff.get("iterable_item_added", [])
for change in diff.get("iterable_item_added", [])
if change.path(output_format="list")[-2] == "type"
if change.t2 != "null"
]
if new_values_in_type_list:
self._raise_error("A new value was added to a 'type' field")

def check_if_type_of_type_field_changed(self):
def check_if_type_of_type_field_changed(self, diff: DeepDiff):
"""
Detect the change of type of a type field
e.g:
Expand All @@ -68,83 +85,89 @@ def check_if_type_of_type_field_changed(self):
- ["str"] -> "int" INVALID
- ["str"] -> 1 INVALID
"""
type_changes = [change for change in self._diff.get("type_changes", []) if change.path(output_format="list")[-1] == "type"]
type_changes = [change for change in diff.get("type_changes", []) if change.path(output_format="list")[-1] == "type"]
for change in type_changes:
# We only accept change on the type field if the new type for this field is list or string
# This might be something already guaranteed by JSON schema validation.
if isinstance(change.t1, str):
if not isinstance(change.t2, list):
self._raise_error("A 'type' field was changed from string to an invalid value.")
self._raise_error("A 'type' field was changed from string to an invalid value.", diff)
# If the new type field is a list we want to make sure it only has the original type (t1) and null: e.g. "str" -> ["str", "null"]
# We want to raise an error otherwise.
t2_not_null_types = [_type for _type in change.t2 if _type != "null"]
if not (len(t2_not_null_types) == 1 and t2_not_null_types[0] == change.t1):
self._raise_error("The 'type' field was changed to a list with multiple invalid values")
self._raise_error("The 'type' field was changed to a list with multiple invalid values", diff)
if isinstance(change.t1, list):
if not isinstance(change.t2, str):
self._raise_error("The 'type' field was changed from a list to an invalid value")
self._raise_error("The 'type' field was changed from a list to an invalid value", diff)
if not (len(change.t1) == 1 and change.t2 == change.t1[0]):
self._raise_error("An element was removed from the list of 'type'")
self._raise_error("An element was removed from the list of 'type'", diff)


class SpecDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a connector specification diff"""

context = "Specification"
context = BackwardIncompatibilityContext.SPEC

def compute_diffs(self):
self.connection_specification_diff = DeepDiff(
self._previous["connectionSpecification"],
self._current["connectionSpecification"],
view="tree",
ignore_order=True,
)

def assert_is_backward_compatible(self):
self.check_if_declared_new_required_field()
self.check_if_added_a_new_required_property()
self.check_if_value_of_type_field_changed()
# self.check_if_new_type_was_added() We want to allow type expansion atm
self.check_if_type_of_type_field_changed()
self.check_if_field_was_made_not_nullable()
self.check_if_enum_was_narrowed()
self.check_if_declared_new_enum_field()

def check_if_declared_new_required_field(self):
self.check_if_declared_new_required_field(self.connection_specification_diff)
self.check_if_added_a_new_required_property(self.connection_specification_diff)
self.check_if_value_of_type_field_changed(self.connection_specification_diff)
# self.check_if_new_type_was_added(self.connection_specification_diff) We want to allow type expansion atm
self.check_if_type_of_type_field_changed(self.connection_specification_diff)
self.check_if_field_was_made_not_nullable(self.connection_specification_diff)
self.check_if_enum_was_narrowed(self.connection_specification_diff)
self.check_if_declared_new_enum_field(self.connection_specification_diff)

def check_if_declared_new_required_field(self, diff: DeepDiff):
"""Check if the new spec declared a 'required' field."""
added_required_fields = [
addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
addition for addition in diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
]
if added_required_fields:
self._raise_error("A new 'required' field was declared.")
self._raise_error("A new 'required' field was declared.", diff)

def check_if_added_a_new_required_property(self):
def check_if_added_a_new_required_property(self, diff: DeepDiff):
"""Check if the new spec added a property to the 'required' list"""
added_required_properties = [
addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
addition for addition in diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
]
if added_required_properties:
self._raise_error("A new property was added to 'required'")
self._raise_error("A new property was added to 'required'", diff)

def check_if_field_was_made_not_nullable(self):
def check_if_field_was_made_not_nullable(self, diff: DeepDiff):
"""Detect when field was made not nullable but is still a list: e.g ["string", "null"] -> ["string"]"""
removed_nullable = [
change for change in self._diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"
]
removed_nullable = [change for change in diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"]
if removed_nullable:
self._raise_error("A field type was narrowed or made a field not nullable")
self._raise_error("A field type was narrowed or made a field not nullable", diff)

def check_if_enum_was_narrowed(self):
def check_if_enum_was_narrowed(self, diff: DeepDiff):
"""Check if the list of values in a enum was shortened in a spec."""
enum_removals = [
enum_removal
for enum_removal in self._diff.get("iterable_item_removed", [])
for enum_removal in diff.get("iterable_item_removed", [])
if enum_removal.up.path(output_format="list")[-1] == "enum"
]
if enum_removals:
self._raise_error("An enum field was narrowed.")
self._raise_error("An enum field was narrowed.", diff)

def check_if_declared_new_enum_field(self):
def check_if_declared_new_enum_field(self, diff: DeepDiff):
"""Check if an 'enum' field was added to the spec."""
enum_additions = [
enum_addition
for enum_addition in self._diff.get("dictionary_item_added", [])
for enum_addition in diff.get("dictionary_item_added", [])
if enum_addition.path(output_format="list")[-1] == "enum"
]
if enum_additions:
self._raise_error("An 'enum' field was declared on an existing property")
self._raise_error("An 'enum' field was declared on an existing property", diff)


def validate_previous_configs(
Expand All @@ -163,26 +186,45 @@ def check_fake_previous_config_against_actual_spec(fake_previous_config):
try:
jsonschema.validate(instance=filtered_fake_previous_config, schema=actual_connector_spec.connectionSpecification)
except jsonschema.exceptions.ValidationError as err:
raise NonBackwardCompatibleError(err)
raise NonBackwardCompatibleError(err, BackwardIncompatibilityContext.SPEC)

check_fake_previous_config_against_actual_spec()


class CatalogDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a discoverd catalog diff"""

context = "Catalog"
context = BackwardIncompatibilityContext.DISCOVER

def compute_diffs(self):
self.streams_json_schemas_diff = DeepDiff(
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in self._previous.items()},
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in self._current.items()},
view="tree",
ignore_order=True,
)
self.streams_cursor_fields_diff = DeepDiff(
{stream_name: airbyte_stream.dict().pop("default_cursor_field") for stream_name, airbyte_stream in self._previous.items()},
{stream_name: airbyte_stream.dict().pop("default_cursor_field") for stream_name, airbyte_stream in self._current.items()},
view="tree",
)

def assert_is_backward_compatible(self):
self.check_if_stream_was_removed()
self.check_if_value_of_type_field_changed()
self.check_if_type_of_type_field_changed()
self.check_if_stream_was_removed(self.streams_json_schemas_diff)
self.check_if_value_of_type_field_changed(self.streams_json_schemas_diff)
self.check_if_type_of_type_field_changed(self.streams_json_schemas_diff)
self.check_if_cursor_field_was_changed(self.streams_cursor_fields_diff)

def check_if_stream_was_removed(self):
def check_if_stream_was_removed(self, diff: DeepDiff):
"""Check if a stream was removed from the catalog."""
removed_streams = []
for removal in self._diff.get("dictionary_item_removed", []):
for removal in diff.get("dictionary_item_removed", []):
if removal.path() != "root" and removal.up.path() == "root":
removed_streams.append(removal.path(output_format="list")[0])
if removed_streams:
self._raise_error(f"The following streams were removed: {','.join(removed_streams)}")
self._raise_error(f"The following streams were removed: {','.join(removed_streams)}", diff)

def check_if_cursor_field_was_changed(self, diff: DeepDiff):
"""Check if a default cursor field value was changed."""
if diff:
self._raise_error("The value of 'default_cursor_field' was changed", diff)
Loading