Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add overwrite flag to dpath flatten #410

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
17 changes: 17 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ definitions:
type: array
items:
"$ref": "#/definitions/AddedFieldDefinition"
condition:
description: Fields will be added if expression is evaluated to True.,
type: string
default: ""
interpolation_context:
- config
- property
- parameters
examples:
- "{{ property|string == '' }}"
- "{{ property is integer }}"
- "{{ property|length > 5 }}"
- "{{ property == 'some_string_to_match' }}"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2259,6 +2272,10 @@ definitions:
title: Delete Origin Value
description: Whether to delete the origin value or keep it. Default is False.
type: boolean
replace_record:
title: Replace Origin Record
description: Whether to replace the origin record or not. Default is False.
type: boolean
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
replace_record: Optional[bool] = Field(
None,
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -1460,6 +1465,16 @@ class AddFields(BaseModel):
description="List of transformations (path and corresponding value) that will be added to the record.",
title="Fields",
)
condition: Optional[str] = Field(
"",
description="Fields will be added if expression is evaluated to True.,",
examples=[
"{{ property|string == '' }}",
"{{ property is integer }}",
"{{ property|length > 5 }}",
"{{ property == 'some_string_to_match' }}",
],
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,11 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
return AddFields(
fields=added_field_definitions,
condition=model.condition,
parameters=model.parameters or {},
)

def create_keys_to_lower_transformation(
self, model: KeysToLowerModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -742,6 +746,7 @@ def create_dpath_flatten_fields(
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
replace_record=model.replace_record if model.replace_record is not None else False,
parameters=model.parameters or {},
)

Expand Down
12 changes: 10 additions & 2 deletions airbyte_cdk/sources/declarative/transformations/add_fields.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, List, Mapping, Optional, Type, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState
Expand Down Expand Up @@ -86,11 +87,16 @@ class AddFields(RecordTransformation):

fields: List[AddedFieldDefinition]
parameters: InitVar[Mapping[str, Any]]
condition: str = ""
_parsed_fields: List[ParsedAddFieldDefinition] = field(
init=False, repr=False, default_factory=list
)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._filter_interpolator = InterpolatedBoolean(
condition=self.condition, parameters=parameters
)

for add_field in self.fields:
if len(add_field.path) < 1:
raise ValueError(
Expand Down Expand Up @@ -132,7 +138,9 @@ def transform(
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
dpath.new(record, parsed_field.path, value)
is_empty_condition = not self.condition
if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
dpath.new(record, parsed_field.path, value)

def __eq__(self, other: Any) -> bool:
return bool(self.__dict__ == other.__dict__)
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class DpathFlattenFields(RecordTransformation):

field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
replace_record: bool = False whether to replace origin record or not. Default is False.

"""

config: Config
field_path: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False
replace_record: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -48,8 +50,12 @@ def transform(
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
if self.replace_record and extracted:
dpath.delete(record, "**")
record.update(extracted)
else:
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
record.update(extracted)
41 changes: 39 additions & 2 deletions unit_tests/sources/declarative/transformations/test_add_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@


@pytest.mark.parametrize(
["input_record", "field", "field_type", "kwargs", "expected"],
["input_record", "field", "field_type", "condition", "kwargs", "expected"],
[
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"",
{},
{"k": "v", "path": "static_value"},
id="add new static value",
Expand All @@ -26,6 +27,7 @@
{"k": "v"},
[(["path"], "{{ 1 }}")],
None,
"",
{},
{"k": "v", "path": 1},
id="add an expression evaluated as a number",
Expand All @@ -34,6 +36,7 @@
{"k": "v"},
[(["path"], "{{ 1 }}")],
str,
"",
{},
{"k": "v", "path": "1"},
id="add an expression evaluated as a string using the value_type field",
Expand All @@ -42,6 +45,7 @@
{"k": "v"},
[(["path"], "static_value"), (["path2"], "static_value2")],
None,
"",
{},
{"k": "v", "path": "static_value", "path2": "static_value2"},
id="add new multiple static values",
Expand All @@ -50,6 +54,7 @@
{"k": "v"},
[(["nested", "path"], "static_value")],
None,
"",
{},
{"k": "v", "nested": {"path": "static_value"}},
id="set static value at nested path",
Expand All @@ -58,6 +63,7 @@
{"k": "v"},
[(["k"], "new_value")],
None,
"",
{},
{"k": "new_value"},
id="update value which already exists",
Expand All @@ -66,6 +72,7 @@
{"k": [0, 1]},
[(["k", 3], "v")],
None,
"",
{},
{"k": [0, 1, None, "v"]},
id="Set element inside array",
Expand All @@ -74,6 +81,7 @@
{"k": "v"},
[(["k2"], '{{ config["shop"] }}')],
None,
"",
{"config": {"shop": "in-n-out"}},
{"k": "v", "k2": "in-n-out"},
id="set a value from the config using bracket notation",
Expand All @@ -82,6 +90,7 @@
{"k": "v"},
[(["k2"], "{{ config.shop }}")],
None,
"",
{"config": {"shop": "in-n-out"}},
{"k": "v", "k2": "in-n-out"},
id="set a value from the config using dot notation",
Expand All @@ -90,6 +99,7 @@
{"k": "v"},
[(["k2"], '{{ stream_slice["start_date"] }}')],
None,
"",
{"stream_slice": {"start_date": "oct1"}},
{"k": "v", "k2": "oct1"},
id="set a value from the stream slice using bracket notation",
Expand All @@ -98,6 +108,7 @@
{"k": "v"},
[(["k2"], "{{ stream_slice.start_date }}")],
None,
"",
{"stream_slice": {"start_date": "oct1"}},
{"k": "v", "k2": "oct1"},
id="set a value from the stream slice using dot notation",
Expand All @@ -106,6 +117,7 @@
{"k": "v"},
[(["k2"], "{{ record.k }}")],
None,
"",
{},
{"k": "v", "k2": "v"},
id="set a value from a field in the record using dot notation",
Expand All @@ -114,6 +126,7 @@
{"k": "v"},
[(["k2"], '{{ record["k"] }}')],
None,
"",
{},
{"k": "v", "k2": "v"},
id="set a value from a field in the record using bracket notation",
Expand All @@ -122,6 +135,7 @@
{"k": {"nested": "v"}},
[(["k2"], "{{ record.k.nested }}")],
None,
"",
{},
{"k": {"nested": "v"}, "k2": "v"},
id="set a value from a nested field in the record using bracket notation",
Expand All @@ -130,6 +144,7 @@
{"k": {"nested": "v"}},
[(["k2"], '{{ record["k"]["nested"] }}')],
None,
"",
{},
{"k": {"nested": "v"}, "k2": "v"},
id="set a value from a nested field in the record using bracket notation",
Expand All @@ -138,22 +153,44 @@
{"k": "v"},
[(["k2"], "{{ 2 + 2 }}")],
None,
"",
{},
{"k": "v", "k2": 4},
id="set a value from a jinja expression",
),
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"{{ False }}",
{},
{"k": "v"},
id="do not add any field if condition is boolean False",
),
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"{{ True }}",
{},
{"k": "v", "path": "static_value"},
id="add all fields if condition is boolean True",
),
],
)
def test_add_fields(
input_record: Mapping[str, Any],
field: List[Tuple[FieldPointer, str]],
field_type: Optional[str],
condition: Optional[str],
kwargs: Mapping[str, Any],
expected: Mapping[str, Any],
):
inputs = [
AddedFieldDefinition(path=v[0], value=v[1], value_type=field_type, parameters={})
for v in field
]
AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs)
AddFields(fields=inputs, condition=condition, parameters={"alas": "i live"}).transform(
input_record, **kwargs
)
assert input_record == expected
Loading
Loading