From 1067a8544a1c241df1c90f6d8acc0259bc01d1b1 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 17 Jan 2025 18:23:55 +0200 Subject: [PATCH 1/5] added DpathFlattenFields --- .../declarative_component_schema.yaml | 29 ++++++ .../models/declarative_component_schema.py | 21 +++++ .../parsers/model_to_component_factory.py | 19 ++++ .../transformations/dpath_flatten_fields.py | 54 +++++++++++ .../test_dpath_flatten_fields.py | 94 +++++++++++++++++++ 5 files changed, 217 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py create mode 100644 unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 980b601a2..90b6875cc 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1316,6 +1316,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/DpathFlattenFields" - "$ref": "#/definitions/KeysReplace" state_migrations: title: State Migrations @@ -1865,6 +1866,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/DpathFlattenFields" - "$ref": "#/definitions/KeysReplace" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" @@ -1969,6 +1971,33 @@ definitions: $parameters: type: object additionalProperties: true + DpathFlattenFields: + title: Dpath Flatten Fields + description: A transformation that flatten field values to the to top of the record. + type: object + required: + - type + - field_path + properties: + type: + type: string + enum: [DpathFlattenFields] + field_path: + title: Field Path + description: A path to field that needs to be flattened. + type: array + items: + - type: string + examples: + - ["data"] + - ["data", "*", "field"] + delete_origin_value: + title: Delete Origin Value + description: Whether to delete the origin value or keep it. Default is False. + type: boolean + $parameters: + type: object + additionalProperties: true KeysReplace: title: Keys Replace description: A transformation that replaces symbols in keys. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 670340472..9c2b242f7 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -792,6 +792,25 @@ class FlattenFields(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class DpathFlattenFields(BaseModel): + type: Literal["DpathFlattenFields"] + field_path: List[str] = Field( + ..., + description="A path to field that needs to be flattened.", + examples=[ + ["data"], + ["data", "*", "field"], + ], + title="Field Path", + ) + delete_origin_value: Optional[bool] = Field( + False, + description="Whether to delete the origin value or keep it. Default is False.", + title="Delete Origin Value", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class KeysReplace(BaseModel): type: Literal["KeysReplace"] old: str = Field( @@ -1810,6 +1829,7 @@ class Config: KeysToLower, KeysToSnakeCase, FlattenFields, + DpathFlattenFields, KeysReplace, ] ] @@ -1985,6 +2005,7 @@ class DynamicSchemaLoader(BaseModel): KeysToLower, KeysToSnakeCase, FlattenFields, + DpathFlattenFields, KeysReplace, ] ] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 597be6386..7ccfe52d0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -219,6 +219,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DpathFlattenFields as DpathFlattenFieldsModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) @@ -433,6 +436,9 @@ from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, ) +from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( + DpathFlattenFields, +) from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( KeysReplaceTransformation, ) @@ -538,6 +544,7 @@ def _init_mappings(self) -> None: KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, KeysReplaceModel: self.create_keys_replace_transformation, FlattenFieldsModel: self.create_flatten_fields, + DpathFlattenFieldsModel: self.create_dpath_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, @@ -672,6 +679,18 @@ def create_flatten_fields( flatten_lists=model.flatten_lists if model.flatten_lists is not None else True ) + def create_dpath_flatten_fields( + self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any + ) -> DpathFlattenFields: + return DpathFlattenFields( + config=config, + field_path=model.field_path, + delete_origin_value=model.delete_origin_value + if model.delete_origin_value is not None + else False, + parameters=model.parameters or {}, + ) + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py new file mode 100644 index 000000000..9a248eb05 --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass, InitVar +from typing import Any, Dict, Optional, List, Union, Mapping + +import dpath + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString + + +@dataclass +class DpathFlattenFields(RecordTransformation): + """ + Flatten fields only for provided path. + + field_path: List[Union[InterpolatedString, str]] path to the field to flatten. + delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. + + """ + + config: Config + field_path: List[Union[InterpolatedString, str]] + parameters: InitVar[Mapping[str, Any]] + delete_origin_value: bool = False + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._field_path = [ + InterpolatedString.create(path, parameters=parameters) for path in self.field_path + ] + for path_index in range(len(self.field_path)): + if isinstance(self.field_path[path_index], str): + self._field_path[path_index] = InterpolatedString.create( + self.field_path[path_index], parameters=parameters + ) + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + path = [path.eval(self.config) for path in self._field_path] + if "*" in path: + matched = dpath.values(record, path) + extracted = matched[0] if matched else None + else: + extracted = dpath.get(record, path, default=[]) + + if self.delete_origin_value: + dpath.delete(record, path) + + if isinstance(extracted, dict): + record.update(extracted) diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py new file mode 100644 index 000000000..122ef73db --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -0,0 +1,94 @@ +import pytest + +from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields + +_ANY_VALUE = -1 +_DELETE_ORIGIN_VALUE = True +_DO_NOT_DELETE_ORIGIN_VALUE = False + + +@pytest.mark.parametrize( + [ + "input_record", + "config", + "field_path", + "delete_origin_value", + "expected_record", + ], + [ + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field3": _ANY_VALUE}, + id="flatten by dpath, delete origin value", + ), + pytest.param( + { + "field1": _ANY_VALUE, + "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, + }, + {}, + ["field2", "*", "field4"], + _DO_NOT_DELETE_ORIGIN_VALUE, + { + "field1": _ANY_VALUE, + "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, + "field5": _ANY_VALUE, + }, + id="flatten by dpath with *, don't delete origin value", + ), + pytest.param( + { + "field1": _ANY_VALUE, + "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, + }, + {}, + ["field2", "*", "field4"], + _DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE}, + id="flatten by dpath with *, delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {"field_path": "field2"}, + ["{{ config['field_path'] }}"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath from config, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["non-existing-field"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + id="flatten by non-existing dpath, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["*", "non-existing-field"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + id="flatten by non-existing dpath with *, don't delete origin value", + ), + ], +) +def test_dpath_flatten_lists( + input_record, config, field_path, delete_origin_value, expected_record +): + flattener = DpathFlattenFields( + field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value + ) + flattener.transform(input_record) + assert input_record == expected_record From ee2abc3df2c9addcf04c3cf6e67f79577a10e45a Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 17 Jan 2025 16:40:03 +0000 Subject: [PATCH 2/5] Auto-fix lint and format issues --- .../parsers/model_to_component_factory.py | 12 ++++++------ .../transformations/dpath_flatten_fields.py | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7ccfe52d0..2850a0d13 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -210,6 +210,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DpathExtractor as DpathExtractorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DpathFlattenFields as DpathFlattenFieldsModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DynamicSchemaLoader as DynamicSchemaLoaderModel, ) @@ -219,9 +222,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - DpathFlattenFields as DpathFlattenFieldsModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) @@ -433,12 +433,12 @@ RemoveFields, ) from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition -from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( - FlattenFields, -) from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( DpathFlattenFields, ) +from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( + FlattenFields, +) from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( KeysReplaceTransformation, ) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 9a248eb05..a2b4af1f6 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -1,11 +1,11 @@ -from dataclasses import dataclass, InitVar -from typing import Any, Dict, Optional, List, Union, Mapping +from dataclasses import InitVar, dataclass +from typing import Any, Dict, List, Mapping, Optional, Union import dpath +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, StreamSlice, StreamState -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString @dataclass From 979e67a0a0469fa3c9219a770c5acd05dc120d55 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 17 Jan 2025 19:12:49 +0200 Subject: [PATCH 3/5] refactor code --- .../sources/declarative/parsers/model_to_component_factory.py | 3 ++- .../declarative/transformations/dpath_flatten_fields.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2850a0d13..51c3172ce 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -682,9 +682,10 @@ def create_flatten_fields( def create_dpath_flatten_fields( self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any ) -> DpathFlattenFields: + model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] return DpathFlattenFields( config=config, - field_path=model.field_path, + field_path=model_field_path, delete_origin_value=model.delete_origin_value if model.delete_origin_value is not None else False, diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index a2b4af1f6..b2c72949b 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -1,5 +1,5 @@ -from dataclasses import InitVar, dataclass -from typing import Any, Dict, List, Mapping, Optional, Union +from dataclasses import dataclass, InitVar +from typing import Any, Dict, Optional, List, Union, Mapping import dpath From 2494e51c4d205c440fdaf4860bc2685fa4a3dffa Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 17 Jan 2025 17:15:54 +0000 Subject: [PATCH 4/5] Auto-fix lint and format issues --- .../declarative/transformations/dpath_flatten_fields.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index b2c72949b..a2b4af1f6 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -1,5 +1,5 @@ -from dataclasses import dataclass, InitVar -from typing import Any, Dict, Optional, List, Union, Mapping +from dataclasses import InitVar, dataclass +from typing import Any, Dict, List, Mapping, Optional, Union import dpath From f73cbcba6d0b1c6d7707664b06645484c86f548b Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 20 Jan 2025 11:13:34 +0200 Subject: [PATCH 5/5] handle conflicts in record keys --- .../transformations/dpath_flatten_fields.py | 9 +++++---- .../transformations/test_dpath_flatten_fields.py | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index a2b4af1f6..73162d848 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -47,8 +47,9 @@ def transform( else: extracted = dpath.get(record, path, default=[]) - if self.delete_origin_value: - dpath.delete(record, path) - if isinstance(extracted, dict): - record.update(extracted) + conflicts = set(extracted.keys()) & set(record.keys()) + if not conflicts: + if self.delete_origin_value: + dpath.delete(record, path) + record.update(extracted) diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index 122ef73db..578999636 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -82,6 +82,22 @@ {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath with *, don't delete origin value", ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath, not to update when record has field conflicts, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath, not to update when record has field conflicts, delete origin value", + ), ], ) def test_dpath_flatten_lists(