Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT: make future_state configuration mandatory in high test strictness level #19085

Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.17
Make `incremental.future_state` mandatory in `high` `test_strictness_level`. [#19085](https://github.com/airbytehq/airbyte/pull/19085/).

## 0.2.16
Run `basic_read` on the discovered catalog in `high` `test_strictness_level`. [#18937](https://github.com/airbytehq/airbyte/pull/18937).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.version=0.2.17
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,18 @@ class FullRefreshConfig(BaseConfig):
)


class FutureStateConfig(BaseConfig):
future_state_path: Optional[str] = Field(description="Path to a state file with values in far future")
missing_streams: List[EmptyStreamConfiguration] = Field(default=[], description="List of missings streams with valid bypass reasons.")


class IncrementalConfig(BaseConfig):
config_path: str = config_path
configured_catalog_path: Optional[str] = configured_catalog_path
cursor_paths: Optional[Mapping[str, List[str]]] = Field(
description="For each stream, the path of its cursor field in the output state messages."
)
future_state_path: Optional[str] = Field(description="Path to a state file with values in far future")
future_state: Optional[FutureStateConfig] = Field(description="Configuration for the future state.")
timeout_seconds: int = timeout_seconds
threshold_days: int = Field(
description="Allow records to be emitted with a cursor value this number of days before the state cursor",
Expand Down Expand Up @@ -230,6 +235,9 @@ def migrate_legacy_to_current_config(legacy_config: dict) -> dict:
basic_read_tests["empty_streams"] = [
{"name": empty_stream_name} for empty_stream_name in basic_read_tests.get("empty_streams", [])
]
for incremental_test in migrated_config["acceptance_tests"].get("incremental", {}).get("tests", []):
if "future_state_path" in incremental_test:
incremental_test["future_state"] = {"future_state_path": incremental_test.pop("future_state_path")}
return migrated_config

@root_validator(pre=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,40 @@
import pytest
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, AirbyteStateType, ConfiguredAirbyteCatalog, Type
from source_acceptance_test import BaseTest
from source_acceptance_test.config import IncrementalConfig
from source_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig
from source_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, filter_output, incremental_only_catalog


@pytest.fixture(name="future_state_path")
def future_state_path_fixture(inputs, base_path) -> Path:
@pytest.fixture(name="future_state_configuration")
def future_state_configuration_fixture(inputs, base_path, test_strictness_level) -> Tuple[Path, List[EmptyStreamConfiguration]]:
"""Fixture with connector's future state path (relative to base_path)"""
if getattr(inputs, "future_state_path"):
return Path(base_path) / getattr(inputs, "future_state_path")
pytest.skip("`future_state_path` not specified, skipping")
if inputs.future_state and inputs.future_state.future_state_path:
return Path(base_path) / inputs.future_state.future_state_path, inputs.future_state.missing_streams
elif test_strictness_level is Config.TestStrictnessLevel.high:
pytest.fail("High test strictness level error: a future state configuration must be provided in high test strictness level.")
else:
pytest.skip("`future_state` not specified, skipping.")


@pytest.fixture(name="future_state")
def future_state_fixture(future_state_path) -> Path:
def future_state_fixture(future_state_configuration, test_strictness_level, configured_catalog) -> List[MutableMapping]:
""""""
future_state_path, missing_streams = future_state_configuration
with open(str(future_state_path), "r") as file:
contents = file.read()
return json.loads(contents)
states = json.loads(contents)
if test_strictness_level is Config.TestStrictnessLevel.high:
if not all([missing_stream.bypass_reason is not None for missing_stream in missing_streams]):
pytest.fail("High test strictness level error: all missing_streams must have a bypass reason specified.")
all_stream_names = set([stream.stream.name for stream in configured_catalog.streams])
streams_in_states = set([state["stream"]["stream_descriptor"]["name"] for state in states])
declared_missing_streams_names = set([missing_stream.name for missing_stream in missing_streams])
undeclared_missing_streams_names = all_stream_names - declared_missing_streams_names - streams_in_states
if undeclared_missing_streams_names:
pytest.fail(
f"High test strictness level error: {', '.join(undeclared_missing_streams_names)} streams are missing in your future_state file, please declare a state for those streams or fill-in a valid bypass_reason."
)
return states


@pytest.fixture(name="cursor_paths")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
from contextlib import nullcontext as does_not_raise
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from unittest.mock import MagicMock, patch

Expand All @@ -24,9 +26,14 @@
SyncMode,
Type,
)
from source_acceptance_test.config import IncrementalConfig
from source_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig
from source_acceptance_test.tests import test_incremental
from source_acceptance_test.tests.test_incremental import TestIncremental as _TestIncremental
from source_acceptance_test.tests.test_incremental import compare_cursor_with_threshold
from source_acceptance_test.tests.test_incremental import (
compare_cursor_with_threshold,
future_state_configuration_fixture,
future_state_fixture,
)


def build_messages_from_record_data(stream: str, records: list[dict]) -> list[AirbyteMessage]:
Expand Down Expand Up @@ -681,3 +688,166 @@ def test_state_with_abnormally_large_values(mocker, read_output, expectation):
future_state=mocker.MagicMock(),
docker_runner=docker_runner_mock,
)


@pytest.mark.parametrize(
"test_strictness_level, inputs, expect_fail, expect_skip",
[
pytest.param(
Config.TestStrictnessLevel.high,
MagicMock(future_state=MagicMock(future_state_path="my_future_state_path", missing_streams=["foo", "bar"])),
False,
False,
id="high test strictness level, future_state_path and missing streams are defined: run the test.",
),
pytest.param(
Config.TestStrictnessLevel.low,
MagicMock(future_state=MagicMock(future_state_path="my_future_state_path", missing_streams=["foo", "bar"])),
False,
False,
id="low test strictness level, future_state_path and missing_streams are defined: run the test.",
),
pytest.param(
Config.TestStrictnessLevel.high,
MagicMock(future_state=MagicMock(future_state_path=None)),
True,
False,
id="high test strictness level, future_state_path and missing streams are defined: fail the test.",
),
pytest.param(
Config.TestStrictnessLevel.low,
MagicMock(future_state=MagicMock(future_state_path=None)),
False,
True,
id="low test strictness level, future_state_path not defined: skip the test.",
),
],
)
def test_future_state_configuration_fixture(mocker, test_strictness_level, inputs, expect_fail, expect_skip):
mocker.patch.object(test_incremental.pytest, "fail")
mocker.patch.object(test_incremental.pytest, "skip")
output = future_state_configuration_fixture.__wrapped__(inputs, "base_path", test_strictness_level)
if not expect_fail and not expect_skip:
assert output == (Path("base_path/my_future_state_path"), ["foo", "bar"])
if expect_fail:
test_incremental.pytest.fail.assert_called_once()
test_incremental.pytest.skip.assert_not_called()
if expect_skip:
test_incremental.pytest.skip.assert_called_once()
test_incremental.pytest.fail.assert_not_called()


TEST_AIRBYTE_STREAM_A = AirbyteStream(name="test_stream_a", json_schema={"k": "v"}, supported_sync_modes=[SyncMode.full_refresh])
TEST_AIRBYTE_STREAM_B = AirbyteStream(name="test_stream_b", json_schema={"k": "v"}, supported_sync_modes=[SyncMode.full_refresh])

TEST_CONFIGURED_AIRBYTE_STREAM_A = ConfiguredAirbyteStream(
stream=TEST_AIRBYTE_STREAM_A,
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)

TEST_CONFIGURED_AIRBYTE_STREAM_B = ConfiguredAirbyteStream(
stream=TEST_AIRBYTE_STREAM_B,
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)


TEST_CONFIGURED_CATALOG = ConfiguredAirbyteCatalog(streams=[TEST_CONFIGURED_AIRBYTE_STREAM_A, TEST_CONFIGURED_AIRBYTE_STREAM_B])


@pytest.mark.parametrize(
"test_strictness_level, configured_catalog, states, missing_streams, expect_fail",
[
pytest.param(
Config.TestStrictnessLevel.high,
TEST_CONFIGURED_CATALOG,
[
{
"type": "STREAM",
"stream": {
"stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}},
"stream_descriptor": {"name": "test_stream_a"},
},
}
],
[EmptyStreamConfiguration(name="test_stream_b", bypass_reason="no good reason")],
False,
id="High test strictness level, all missing streams are declared with bypass reason: does not fail.",
),
pytest.param(
Config.TestStrictnessLevel.high,
TEST_CONFIGURED_CATALOG,
[
{
"type": "STREAM",
"stream": {
"stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}},
"stream_descriptor": {"name": "test_stream_a"},
},
}
],
[EmptyStreamConfiguration(name="test_stream_b")],
True,
id="High test strictness level, missing streams are declared without bypass reason: fail.",
),
pytest.param(
Config.TestStrictnessLevel.high,
TEST_CONFIGURED_CATALOG,
[
{
"type": "STREAM",
"stream": {
"stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}},
"stream_descriptor": {"name": "test_stream_a"},
},
}
],
[EmptyStreamConfiguration(name="test_stream_b")],
False,
id="Low test strictness level, missing streams are declared without bypass reason: does fail.",
),
pytest.param(
Config.TestStrictnessLevel.high,
TEST_CONFIGURED_CATALOG,
[
{
"type": "STREAM",
"stream": {
"stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}},
"stream_descriptor": {"name": "test_stream_a"},
},
}
],
[],
True,
id="High test strictness level, missing streams are not declared: fail.",
),
pytest.param(
Config.TestStrictnessLevel.low,
TEST_CONFIGURED_CATALOG,
[
{
"type": "STREAM",
"stream": {
"stream_state": {"airbytehq/integration-test": {"updated_at": "2121-06-30T10:22:10Z"}},
"stream_descriptor": {"name": "test_stream_a"},
},
}
],
[],
False,
id="Low test strictness level, missing streams are not declared: does not fail.",
),
],
)
def test_future_state_fixture(tmp_path, mocker, test_strictness_level, configured_catalog, states, missing_streams, expect_fail):
mocker.patch.object(test_incremental.pytest, "fail")
future_state_path = tmp_path / "abnormal_states.json"
with open(future_state_path, "w") as f:
json.dump(states, f)
future_state_configuration = (future_state_path, missing_streams)
output = future_state_fixture.__wrapped__(future_state_configuration, test_strictness_level, configured_catalog)
assert output == states
if expect_fail:
test_incremental.pytest.fail.assert_called_once()
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ tests:
future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:
comments: ["airbytehq/integration-test", "updated_at"]
commit_comment_reactions: ["airbytehq/integration-test", "55538825", "created_at"]
commit_comment_reactions:
["airbytehq/integration-test", "55538825", "created_at"]
commit_comments: ["airbytehq/integration-test", "updated_at"]
commits: ["airbytehq/integration-test", "master", "created_at"]
deployments: ["airbytehq/integration-test", "updated_at"]
events: ["airbytehq/integration-test", "created_at"]
issue_comment_reactions: ["airbytehq/integration-test", "907296275", "created_at"]
issue_comment_reactions:
["airbytehq/integration-test", "907296275", "created_at"]
issue_events: ["airbytehq/integration-test", "created_at"]
issue_milestones: ["airbytehq/integration-test", "updated_at"]
issue_reactions: ["airbytehq/integration-test", "11", "created_at"]
Expand All @@ -36,7 +38,8 @@ tests:
project_columns:
["airbytehq/integration-test", "13167124", "updated_at"]
projects: ["airbytehq/integration-test", "updated_at"]
pull_request_comment_reactions: ["airbytehq/integration-test", "699253726", "created_at"]
pull_request_comment_reactions:
["airbytehq/integration-test", "699253726", "created_at"]
pull_request_stats: ["airbytehq/integration-test", "updated_at"]
pull_requests: ["airbytehq/integration-test", "updated_at"]
releases: ["airbytehq/integration-test", "created_at"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,27 @@ acceptance_tests:
bypass_reason: "This stream can't be seeded in our sandbox account"
timeout_seconds: 1200
...
```
```

#### Incremental: `future_state` must be set
In `high` test strictness level we expect the `future_state` configuration to be set.
The future state JSON file (usually `abnormal_states.json`) must contain one state for each stream declared in the configured catalog.
`missing_streams` can be set to ignore a subset of the streams with a valid bypass reason. E.G:

```yaml
test_strictness_level: high
connector_image: airbyte/source-my-connector:dev
acceptance_tests:
...
incremental:
tests:
- config_path: secrets/config.json
configured_catalog_path: integration_tests/configured_catalog.json
cursor_paths:
...
future_state:
future_state_path: integration_tests/abnormal_state.json
missing_streams:
- name: my_missing_stream
bypass_reason: "Please fill a good reason"
```