Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add condition to add fields #409

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ definitions:
type: array
items:
"$ref": "#/definitions/AddedFieldDefinition"
condition:
description: Fields will be added if expression is evaluated to True.,
type: string
default: ""
interpolation_context:
- config
- property
- parameters
examples:
- "{{ property|string == '' }}"
- "{{ property is integer }}"
- "{{ property|length > 5 }}"
- "{{ property == 'some_string_to_match' }}"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,16 @@ class AddFields(BaseModel):
description="List of transformations (path and corresponding value) that will be added to the record.",
title="Fields",
)
condition: Optional[str] = Field(
"",
description="Fields will be added if expression is evaluated to True.,",
examples=[
"{{ property|string == '' }}",
"{{ property is integer }}",
"{{ property|length > 5 }}",
"{{ property == 'some_string_to_match' }}",
],
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,11 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
return AddFields(
fields=added_field_definitions,
condition=model.condition or "",
parameters=model.parameters or {},
)

def create_keys_to_lower_transformation(
self, model: KeysToLowerModel, config: Config, **kwargs: Any
Expand Down
12 changes: 10 additions & 2 deletions airbyte_cdk/sources/declarative/transformations/add_fields.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, List, Mapping, Optional, Type, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState
Expand Down Expand Up @@ -86,11 +87,16 @@ class AddFields(RecordTransformation):

fields: List[AddedFieldDefinition]
parameters: InitVar[Mapping[str, Any]]
condition: str = ""
_parsed_fields: List[ParsedAddFieldDefinition] = field(
init=False, repr=False, default_factory=list
)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._filter_interpolator = InterpolatedBoolean(
condition=self.condition, parameters=parameters
)

for add_field in self.fields:
if len(add_field.path) < 1:
raise ValueError(
Expand Down Expand Up @@ -132,7 +138,9 @@ def transform(
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
dpath.new(record, parsed_field.path, value)
is_empty_condition = not self.condition
if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
dpath.new(record, parsed_field.path, value)

def __eq__(self, other: Any) -> bool:
return bool(self.__dict__ == other.__dict__)
41 changes: 39 additions & 2 deletions unit_tests/sources/declarative/transformations/test_add_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@


@pytest.mark.parametrize(
["input_record", "field", "field_type", "kwargs", "expected"],
["input_record", "field", "field_type", "condition", "kwargs", "expected"],
[
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"",
{},
{"k": "v", "path": "static_value"},
id="add new static value",
Expand All @@ -26,6 +27,7 @@
{"k": "v"},
[(["path"], "{{ 1 }}")],
None,
"",
{},
{"k": "v", "path": 1},
id="add an expression evaluated as a number",
Expand All @@ -34,6 +36,7 @@
{"k": "v"},
[(["path"], "{{ 1 }}")],
str,
"",
{},
{"k": "v", "path": "1"},
id="add an expression evaluated as a string using the value_type field",
Expand All @@ -42,6 +45,7 @@
{"k": "v"},
[(["path"], "static_value"), (["path2"], "static_value2")],
None,
"",
{},
{"k": "v", "path": "static_value", "path2": "static_value2"},
id="add new multiple static values",
Expand All @@ -50,6 +54,7 @@
{"k": "v"},
[(["nested", "path"], "static_value")],
None,
"",
{},
{"k": "v", "nested": {"path": "static_value"}},
id="set static value at nested path",
Expand All @@ -58,6 +63,7 @@
{"k": "v"},
[(["k"], "new_value")],
None,
"",
{},
{"k": "new_value"},
id="update value which already exists",
Expand All @@ -66,6 +72,7 @@
{"k": [0, 1]},
[(["k", 3], "v")],
None,
"",
{},
{"k": [0, 1, None, "v"]},
id="Set element inside array",
Expand All @@ -74,6 +81,7 @@
{"k": "v"},
[(["k2"], '{{ config["shop"] }}')],
None,
"",
{"config": {"shop": "in-n-out"}},
{"k": "v", "k2": "in-n-out"},
id="set a value from the config using bracket notation",
Expand All @@ -82,6 +90,7 @@
{"k": "v"},
[(["k2"], "{{ config.shop }}")],
None,
"",
{"config": {"shop": "in-n-out"}},
{"k": "v", "k2": "in-n-out"},
id="set a value from the config using dot notation",
Expand All @@ -90,6 +99,7 @@
{"k": "v"},
[(["k2"], '{{ stream_slice["start_date"] }}')],
None,
"",
{"stream_slice": {"start_date": "oct1"}},
{"k": "v", "k2": "oct1"},
id="set a value from the stream slice using bracket notation",
Expand All @@ -98,6 +108,7 @@
{"k": "v"},
[(["k2"], "{{ stream_slice.start_date }}")],
None,
"",
{"stream_slice": {"start_date": "oct1"}},
{"k": "v", "k2": "oct1"},
id="set a value from the stream slice using dot notation",
Expand All @@ -106,6 +117,7 @@
{"k": "v"},
[(["k2"], "{{ record.k }}")],
None,
"",
{},
{"k": "v", "k2": "v"},
id="set a value from a field in the record using dot notation",
Expand All @@ -114,6 +126,7 @@
{"k": "v"},
[(["k2"], '{{ record["k"] }}')],
None,
"",
{},
{"k": "v", "k2": "v"},
id="set a value from a field in the record using bracket notation",
Expand All @@ -122,6 +135,7 @@
{"k": {"nested": "v"}},
[(["k2"], "{{ record.k.nested }}")],
None,
"",
{},
{"k": {"nested": "v"}, "k2": "v"},
id="set a value from a nested field in the record using bracket notation",
Expand All @@ -130,6 +144,7 @@
{"k": {"nested": "v"}},
[(["k2"], '{{ record["k"]["nested"] }}')],
None,
"",
{},
{"k": {"nested": "v"}, "k2": "v"},
id="set a value from a nested field in the record using bracket notation",
Expand All @@ -138,22 +153,44 @@
{"k": "v"},
[(["k2"], "{{ 2 + 2 }}")],
None,
"",
{},
{"k": "v", "k2": 4},
id="set a value from a jinja expression",
),
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"{{ False }}",
{},
{"k": "v"},
id="do not add any field if condition is boolean False",
),
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"{{ True }}",
{},
{"k": "v", "path": "static_value"},
id="add all fields if condition is boolean True",
),
],
)
def test_add_fields(
input_record: Mapping[str, Any],
field: List[Tuple[FieldPointer, str]],
field_type: Optional[str],
condition: Optional[str],
kwargs: Mapping[str, Any],
expected: Mapping[str, Any],
):
inputs = [
AddedFieldDefinition(path=v[0], value=v[1], value_type=field_type, parameters={})
for v in field
]
AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs)
AddFields(fields=inputs, condition=condition, parameters={"alas": "i live"}).transform(
input_record, **kwargs
)
assert input_record == expected
Loading