Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add overwrite flag to dpath flatten #410

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,10 @@ definitions:
title: Delete Origin Value
description: Whether to delete the origin value or keep it. Default is False.
type: boolean
replace_record:
title: Replace Origin Record
description: Whether to replace the origin record or not. Default is False.
type: boolean
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
replace_record: Optional[bool] = Field(
None,
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,10 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
return AddFields(
fields=added_field_definitions,
parameters=model.parameters or {},
)

def create_keys_to_lower_transformation(
self, model: KeysToLowerModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -742,6 +745,7 @@ def create_dpath_flatten_fields(
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
replace_record=model.replace_record if model.replace_record is not None else False,
parameters=model.parameters or {},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class DpathFlattenFields(RecordTransformation):

field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
replace_record: bool = False whether to replace origin record or not. Default is False.

"""

config: Config
field_path: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False
replace_record: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -48,8 +50,12 @@ def transform(
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
if self.replace_record and extracted:
dpath.delete(record, "**")
record.update(extracted)
else:
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
record.update(extracted)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

_ANY_VALUE = -1
_DELETE_ORIGIN_VALUE = True
_REPLACE_WITH_VALUE = True
_DO_NOT_DELETE_ORIGIN_VALUE = False
_DO_NOT_REPLACE_WITH_VALUE = False


@pytest.mark.parametrize(
Expand All @@ -13,6 +15,7 @@
"config",
"field_path",
"delete_origin_value",
"replace_record",
"expected_record",
],
[
Expand All @@ -21,6 +24,7 @@
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, don't delete origin value",
),
Expand All @@ -29,6 +33,7 @@
{},
["field2"],
_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
id="flatten by dpath, delete origin value",
),
Expand All @@ -40,6 +45,7 @@
{},
["field2", "*", "field4"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
Expand All @@ -55,6 +61,7 @@
{},
["field2", "*", "field4"],
_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
id="flatten by dpath with *, delete origin value",
),
Expand All @@ -63,6 +70,7 @@
{"field_path": "field2"},
["{{ config['field_path'] }}"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath from config, don't delete origin value",
),
Expand All @@ -71,6 +79,7 @@
{},
["non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath, don't delete origin value",
),
Expand All @@ -79,6 +88,7 @@
{},
["*", "non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath with *, don't delete origin value",
),
Expand All @@ -87,6 +97,7 @@
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
),
Expand All @@ -95,16 +106,39 @@
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
{"field3": _ANY_VALUE},
id="flatten by dpath, replace with value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
{"field3": _ANY_VALUE},
id="flatten by dpath, delete_origin_value do not affect to replace_record",
),
],
)
def test_dpath_flatten_lists(
input_record, config, field_path, delete_origin_value, expected_record
input_record, config, field_path, delete_origin_value, replace_record, expected_record
):
flattener = DpathFlattenFields(
field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value
field_path=field_path,
parameters={},
config=config,
delete_origin_value=delete_origin_value,
replace_record=replace_record,
)
flattener.transform(input_record)
assert input_record == expected_record
Loading