diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 78123bb4d33e4..74f0458112387 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.2.17 +Make `incremental.future_state` mandatory in `high` `test_strictness_level`. [#19085](https://github.com/airbytehq/airbyte/pull/19085/). + ## 0.2.16 Run `basic_read` on the discovered catalog in `high` `test_strictness_level`. [#18937](https://github.com/airbytehq/airbyte/pull/18937). diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index d1cb06c59e73c..7b0957160cd8d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.2.16 +LABEL io.airbyte.version=0.2.17 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py index 5bd5307e4fe98..4fbeae339d3b0 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py @@ -141,13 +141,18 @@ class FullRefreshConfig(BaseConfig): ) +class FutureStateConfig(BaseConfig): + future_state_path: Optional[str] = Field(description="Path to a state file with values in far future") + missing_streams: List[EmptyStreamConfiguration] = Field(default=[], description="List of missings streams with valid bypass reasons.") + + class IncrementalConfig(BaseConfig): config_path: str = config_path configured_catalog_path: Optional[str] = configured_catalog_path cursor_paths: Optional[Mapping[str, List[str]]] = Field( description="For each stream, the path of its cursor field in the output state messages." ) - future_state_path: Optional[str] = Field(description="Path to a state file with values in far future") + future_state: Optional[FutureStateConfig] = Field(description="Configuration for the future state.") timeout_seconds: int = timeout_seconds threshold_days: int = Field( description="Allow records to be emitted with a cursor value this number of days before the state cursor", @@ -230,6 +235,9 @@ def migrate_legacy_to_current_config(legacy_config: dict) -> dict: basic_read_tests["empty_streams"] = [ {"name": empty_stream_name} for empty_stream_name in basic_read_tests.get("empty_streams", []) ] + for incremental_test in migrated_config["acceptance_tests"].get("incremental", {}).get("tests", []): + if "future_state_path" in incremental_test: + incremental_test["future_state"] = {"future_state_path": incremental_test.pop("future_state_path")} return migrated_config @root_validator(pre=True) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py index 15e2f7c38ae4f..219bfc70e55f5 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py @@ -11,24 +11,40 @@ import pytest from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, AirbyteStateType, ConfiguredAirbyteCatalog, Type from source_acceptance_test import BaseTest -from source_acceptance_test.config import IncrementalConfig +from source_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig from source_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, filter_output, incremental_only_catalog -@pytest.fixture(name="future_state_path") -def future_state_path_fixture(inputs, base_path) -> Path: +@pytest.fixture(name="future_state_configuration") +def future_state_configuration_fixture(inputs, base_path, test_strictness_level) -> Tuple[Path, List[EmptyStreamConfiguration]]: """Fixture with connector's future state path (relative to base_path)""" - if getattr(inputs, "future_state_path"): - return Path(base_path) / getattr(inputs, "future_state_path") - pytest.skip("`future_state_path` not specified, skipping") + if inputs.future_state and inputs.future_state.future_state_path: + return Path(base_path) / inputs.future_state.future_state_path, inputs.future_state.missing_streams + elif test_strictness_level is Config.TestStrictnessLevel.high: + pytest.fail("High test strictness level error: a future state configuration must be provided in high test strictness level.") + else: + pytest.skip("`future_state` not specified, skipping.") @pytest.fixture(name="future_state") -def future_state_fixture(future_state_path) -> Path: +def future_state_fixture(future_state_configuration, test_strictness_level, configured_catalog) -> List[MutableMapping]: """""" + future_state_path, missing_streams = future_state_configuration with open(str(future_state_path), "r") as file: contents = file.read() - return json.loads(contents) + states = json.loads(contents) + if test_strictness_level is Config.TestStrictnessLevel.high: + if not all([missing_stream.bypass_reason is not None for missing_stream in missing_streams]): + pytest.fail("High test strictness level error: all missing_streams must have a bypass reason specified.") + all_stream_names = set([stream.stream.name for stream in configured_catalog.streams]) + streams_in_states = set([state["stream"]["stream_descriptor"]["name"] for state in states]) + declared_missing_streams_names = set([missing_stream.name for missing_stream in missing_streams]) + undeclared_missing_streams_names = all_stream_names - declared_missing_streams_names - streams_in_states + if undeclared_missing_streams_names: + pytest.fail( + f"High test strictness level error: {', '.join(undeclared_missing_streams_names)} streams are missing in your future_state file, please declare a state for those streams or fill-in a valid bypass_reason." + ) + return states @pytest.fixture(name="cursor_paths") diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py index fe061812b4090..367a74c5ecb3a 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py @@ -2,8 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json from contextlib import nullcontext as does_not_raise from datetime import datetime +from pathlib import Path from typing import Any, Optional from unittest.mock import MagicMock, patch @@ -24,9 +26,14 @@ SyncMode, Type, ) -from source_acceptance_test.config import IncrementalConfig +from source_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig +from source_acceptance_test.tests import test_incremental from source_acceptance_test.tests.test_incremental import TestIncremental as _TestIncremental -from source_acceptance_test.tests.test_incremental import compare_cursor_with_threshold +from source_acceptance_test.tests.test_incremental import ( + compare_cursor_with_threshold, + future_state_configuration_fixture, + future_state_fixture, +) def build_messages_from_record_data(stream: str, records: list[dict]) -> list[AirbyteMessage]: @@ -681,3 +688,166 @@ def test_state_with_abnormally_large_values(mocker, read_output, expectation): future_state=mocker.MagicMock(), docker_runner=docker_runner_mock, ) + + +@pytest.mark.parametrize( + "test_strictness_level, inputs, expect_fail, expect_skip", + [ + pytest.param( + Config.TestStrictnessLevel.high, + MagicMock(future_state=MagicMock(future_state_path="my_future_state_path", missing_streams=["foo", "bar"])), + False, + False, + id="high test strictness level, future_state_path and missing streams are defined: run the test.", + ), + pytest.param( + Config.TestStrictnessLevel.low, + MagicMock(future_state=MagicMock(future_state_path="my_future_state_path", missing_streams=["foo", "bar"])), + False, + False, + id="low test strictness level, future_state_path and missing_streams are defined: run the test.", + ), + pytest.param( + Config.TestStrictnessLevel.high, + MagicMock(future_state=MagicMock(future_state_path=None)), + True, + False, + id="high test strictness level, future_state_path and missing streams are defined: fail the test.", + ), + pytest.param( + Config.TestStrictnessLevel.low, + MagicMock(future_state=MagicMock(future_state_path=None)), + False, + True, + id="low test strictness level, future_state_path not defined: skip the test.", + ), + ], +) +def test_future_state_configuration_fixture(mocker, test_strictness_level, inputs, expect_fail, expect_skip): + mocker.patch.object(test_incremental.pytest, "fail") + mocker.patch.object(test_incremental.pytest, "skip") + output = future_state_configuration_fixture.__wrapped__(inputs, "base_path", test_strictness_level) + if not expect_fail and not expect_skip: + assert output == (Path("base_path/my_future_state_path"), ["foo", "bar"]) + if expect_fail: + test_incremental.pytest.fail.assert_called_once() + test_incremental.pytest.skip.assert_not_called() + if expect_skip: + test_incremental.pytest.skip.assert_called_once() + test_incremental.pytest.fail.assert_not_called() + + +TEST_AIRBYTE_STREAM_A = AirbyteStream(name="test_stream_a", json_schema={"k": "v"}, supported_sync_modes=[SyncMode.full_refresh]) +TEST_AIRBYTE_STREAM_B = AirbyteStream(name="test_stream_b", json_schema={"k": "v"}, supported_sync_modes=[SyncMode.full_refresh]) + +TEST_CONFIGURED_AIRBYTE_STREAM_A = ConfiguredAirbyteStream( + stream=TEST_AIRBYTE_STREAM_A, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, +) + +TEST_CONFIGURED_AIRBYTE_STREAM_B = ConfiguredAirbyteStream( + stream=TEST_AIRBYTE_STREAM_B, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, +) + + +TEST_CONFIGURED_CATALOG = ConfiguredAirbyteCatalog(streams=[TEST_CONFIGURED_AIRBYTE_STREAM_A, TEST_CONFIGURED_AIRBYTE_STREAM_B]) + + +@pytest.mark.parametrize( + "test_strictness_level, configured_catalog, states, missing_streams, expect_fail", + [ + pytest.param( + Config.TestStrictnessLevel.high, + TEST_CONFIGURED_CATALOG, + [ + { + "type": "STREAM", + "stream": { + "stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}}, + "stream_descriptor": {"name": "test_stream_a"}, + }, + } + ], + [EmptyStreamConfiguration(name="test_stream_b", bypass_reason="no good reason")], + False, + id="High test strictness level, all missing streams are declared with bypass reason: does not fail.", + ), + pytest.param( + Config.TestStrictnessLevel.high, + TEST_CONFIGURED_CATALOG, + [ + { + "type": "STREAM", + "stream": { + "stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}}, + "stream_descriptor": {"name": "test_stream_a"}, + }, + } + ], + [EmptyStreamConfiguration(name="test_stream_b")], + True, + id="High test strictness level, missing streams are declared without bypass reason: fail.", + ), + pytest.param( + Config.TestStrictnessLevel.high, + TEST_CONFIGURED_CATALOG, + [ + { + "type": "STREAM", + "stream": { + "stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}}, + "stream_descriptor": {"name": "test_stream_a"}, + }, + } + ], + [EmptyStreamConfiguration(name="test_stream_b")], + False, + id="Low test strictness level, missing streams are declared without bypass reason: does fail.", + ), + pytest.param( + Config.TestStrictnessLevel.high, + TEST_CONFIGURED_CATALOG, + [ + { + "type": "STREAM", + "stream": { + "stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}}, + "stream_descriptor": {"name": "test_stream_a"}, + }, + } + ], + [], + True, + id="High test strictness level, missing streams are not declared: fail.", + ), + pytest.param( + Config.TestStrictnessLevel.low, + TEST_CONFIGURED_CATALOG, + [ + { + "type": "STREAM", + "stream": { + "stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}}, + "stream_descriptor": {"name": "test_stream_a"}, + }, + } + ], + [], + False, + id="Low test strictness level, missing streams are not declared: does not fail.", + ), + ], +) +def test_future_state_fixture(tmp_path, mocker, test_strictness_level, configured_catalog, states, missing_streams, expect_fail): + mocker.patch.object(test_incremental.pytest, "fail") + future_state_path = tmp_path / "abnormal_states.json" + with open(future_state_path, "w") as f: + json.dump(states, f) + future_state_configuration = (future_state_path, missing_streams) + output = future_state_fixture.__wrapped__(future_state_configuration, test_strictness_level, configured_catalog) + assert output == states + if expect_fail: + test_incremental.pytest.fail.assert_called_once() diff --git a/airbyte-integrations/connectors/source-github/acceptance-test-config.yml b/airbyte-integrations/connectors/source-github/acceptance-test-config.yml index b030d57fd8a2a..f629b322298be 100644 --- a/airbyte-integrations/connectors/source-github/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-github/acceptance-test-config.yml @@ -21,12 +21,14 @@ tests: future_state_path: "integration_tests/abnormal_state.json" cursor_paths: comments: ["airbytehq/integration-test", "updated_at"] - commit_comment_reactions: ["airbytehq/integration-test", "55538825", "created_at"] + commit_comment_reactions: + ["airbytehq/integration-test", "55538825", "created_at"] commit_comments: ["airbytehq/integration-test", "updated_at"] commits: ["airbytehq/integration-test", "master", "created_at"] deployments: ["airbytehq/integration-test", "updated_at"] events: ["airbytehq/integration-test", "created_at"] - issue_comment_reactions: ["airbytehq/integration-test", "907296275", "created_at"] + issue_comment_reactions: + ["airbytehq/integration-test", "907296275", "created_at"] issue_events: ["airbytehq/integration-test", "created_at"] issue_milestones: ["airbytehq/integration-test", "updated_at"] issue_reactions: ["airbytehq/integration-test", "11", "created_at"] @@ -36,7 +38,8 @@ tests: project_columns: ["airbytehq/integration-test", "13167124", "updated_at"] projects: ["airbytehq/integration-test", "updated_at"] - pull_request_comment_reactions: ["airbytehq/integration-test", "699253726", "created_at"] + pull_request_comment_reactions: + ["airbytehq/integration-test", "699253726", "created_at"] pull_request_stats: ["airbytehq/integration-test", "updated_at"] pull_requests: ["airbytehq/integration-test", "updated_at"] releases: ["airbytehq/integration-test", "created_at"] diff --git a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md index a716ec1bca90b..653964db51064 100644 --- a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md +++ b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md @@ -326,4 +326,27 @@ acceptance_tests: bypass_reason: "This stream can't be seeded in our sandbox account" timeout_seconds: 1200 ... -``` \ No newline at end of file +``` + +#### Incremental: `future_state` must be set +In `high` test strictness level we expect the `future_state` configuration to be set. +The future state JSON file (usually `abnormal_states.json`) must contain one state for each stream declared in the configured catalog. +`missing_streams` can be set to ignore a subset of the streams with a valid bypass reason. E.G: + +```yaml +test_strictness_level: high +connector_image: airbyte/source-my-connector:dev +acceptance_tests: + ... + incremental: + tests: + - config_path: secrets/config.json + configured_catalog_path: integration_tests/configured_catalog.json + cursor_paths: + ... + future_state: + future_state_path: integration_tests/abnormal_state.json + missing_streams: + - name: my_missing_stream + bypass_reason: "Please fill a good reason" +```