Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
At record extraction step, in each record add the service field $root holding a reference to:
* the root response object, when parsing JSON format
* the original record, when parsing JSONL format
that each record to process is extracted from.
More service fields could be added in future.
The service fields are available in the record's filtering and transform steps.

Avoid:
* reusing the maps/dictionaries produced, thus avoid building cyclic structures
* transforming the service fields in the Flatten transformation.

Explicitly cleanup the service field(s) after the transform step, thus making them:
* local for the filter and transform steps
* not visible to the next mapping and store steps (as they should be)
* not visible in the tests beyond the test_record_selector (as they should be)
This allows the record transformation logic to define its "local variables" to reuse
some interim calculations.

The contract of body parsing seems irregular in representing the cases of bad JSON, no JSON and empty JSON.
Cannot be unified as that that irregularity is already used.

Update the development environment setup documentation
* to organize and present the setup steps explicitly
* to avoid misunderstandings and wasted efforts.

Update CONTRIBUTING.md to
* collect and organize the knowledge on running the test locally.
* state the actual testing steps.
* clarify and make explicit the procedures and steps.

The unit, integration, and acceptance tests in this exactly version succeed under Fedora 41, while
one of them fails under Oracle Linux 8.7. not related to the contents of this PR.
The integration tests of the CDK fail due to missing `secrets/config.json` file for the Shopify source.
See airbytehq#197

Polish

Integrate the DpathEnhancingExtractor in the UI of Airbyte.
Created a DPath Enhancing Extractor
Refactored the record enhancement logic - moved to the extracted class
Split the tests of DPathExtractor and DPathEnhancingExtractor

Fix the failing tests:

FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_custom_components[test_create_custom_component_with_subcomponent_that_uses_parameters]
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_custom_components_do_not_contain_extra_fields
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_parse_custom_component_fields_if_subcomponent
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_page_increment
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_offset_increment
FAILED unit_tests/sources/file_based/test_file_based_scenarios.py::test_file_based_read[simple_unstructured_scenario]
FAILED unit_tests/sources/file_based/test_file_based_scenarios.py::test_file_based_read[no_file_extension_unstructured_scenario]

They faile because of comparing string and int values of the page_size (public) attribute.
Imposed an invariant:
  on construction, page_size can be set to a string or int
  keep only values of one type in page_size for uniform comparison (convert the values of the other type)
  _page_size holds the internal / working value
... unless manipulated directly.

Merged:
feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework (airbytehq#228)
fix(low-code): Fix declarative low-code state migration in SubstreamPartitionRouter (airbytehq#267)
feat: combine slash command jobs into single job steps (airbytehq#266)
feat(low-code): add items and property mappings to dynamic schemas (airbytehq#256)
feat: add help response for unrecognized slash commands (airbytehq#264)
ci: post direct links to html connector test reports (airbytehq#252) (airbytehq#263)
fix(low-code): Fix legacy state migration in SubstreamPartitionRouter (airbytehq#261)
fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (airbytehq#254)
feat(low-code): add profile assertion flow to oauth authenticator component (airbytehq#236)
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (airbytehq#111)
fix: don't mypy unit_tests (airbytehq#241)
fix: handle backoff_strategies in CompositeErrorHandler (airbytehq#225)
feat(concurrent cursor): attempt at clamping datetime (airbytehq#234)
fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (airbytehq#254)
feat(low-code): add profile assertion flow to oauth authenticator component (airbytehq#236)
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (airbytehq#111)
fix: don't mypy unit_tests (airbytehq#241)
fix: handle backoff_strategies in CompositeErrorHandler (airbytehq#225)
feat(concurrent cursor): attempt at clamping datetime (airbytehq#234)
ci: use `ubuntu-24.04` explicitly (resolves CI warnings) (airbytehq#244)
Fix(sdm): module ref issue in python components import (airbytehq#243)
feat(source-declarative-manifest): add support for custom Python components from dynamic text input (airbytehq#174)
chore(deps): bump avro from 1.11.3 to 1.12.0 (airbytehq#133)
docs: comments on what the `Dockerfile` is for (airbytehq#240)
chore: move ruff configuration to dedicated ruff.toml file (airbytehq#237)
Fix(sdm): module ref issue in python components import (airbytehq#243)
feat(low-code): add DpathFlattenFields (airbytehq#227)
feat(source-declarative-manifest): add support for custom Python components from dynamic text input (airbytehq#174)
chore(deps): bump avro from 1.11.3 to 1.12.0 (airbytehq#133)
docs: comments on what the `Dockerfile` is for (airbytehq#240)
chore: move ruff configuration to dedicated ruff.toml file (airbytehq#237)

formatted

Update record_extractor.py

Trigger a new build. Hopefully, the integration test infrastructure is fixed.

Update CONTRIBUTING.md

Trigger a new build
  • Loading branch information
rpopov committed Feb 8, 2025
1 parent 6260248 commit 19b76fe
Show file tree
Hide file tree
Showing 33 changed files with 1,665 additions and 133 deletions.
3 changes: 2 additions & 1 deletion airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
from .sources.declarative.declarative_stream import DeclarativeStream
from .sources.declarative.decoders import Decoder, JsonDecoder
from .sources.declarative.exceptions import ReadException
from .sources.declarative.extractors import DpathExtractor, RecordSelector
from .sources.declarative.extractors import DpathEnhancingExtractor, DpathExtractor, RecordSelector
from .sources.declarative.extractors.record_extractor import RecordExtractor
from .sources.declarative.extractors.record_filter import RecordFilter
from .sources.declarative.incremental import DatetimeBasedCursor
Expand Down Expand Up @@ -234,6 +234,7 @@
"DefaultPaginator",
"DefaultRequestOptionsProvider",
"DpathExtractor",
"DpathEnhancingExtractor",
"FieldPointer",
"HttpMethod",
"HttpRequester",
Expand Down
34 changes: 34 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,39 @@ definitions:
$parameters:
type: object
additionalProperties: true

DpathEnhancingExtractor:
title: Dpath Enhancing Extractor
description: |
Extract records from a response, navigating a path as an array of fields. Include $parent and $root service fields where:
$root holds the original response;
$parent holds the attributes container object, including its $parent field.
These service fields are local for the filter and transform phases.
type: object
required:
- type
- field_path
properties:
type:
type: string
enum: [DpathEnhancingExtractor]
field_path:
title: Field Path
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
type: array
items:
- type: string
interpolation_context:
- config
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
$parameters:
type: object
additionalProperties: true

ResponseToFileExtractor:
title: CSV To File Extractor
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
Expand Down Expand Up @@ -2775,6 +2808,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/DpathEnhancingExtractor"
record_filter:
title: Record Filter
description: Responsible for filtering records to be emitted by the Source.
Expand Down
17 changes: 7 additions & 10 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ def decode(
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}
except requests.exceptions.JSONDecodeError as ex:
logger.warning("Response cannot be parsed into json: %s", ex)
logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
yield {} # Keep the exiting contract

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]


@dataclass
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.extractors.dpath_enhancing_extractor import (
DpathEnhancingExtractor,
)
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
Expand All @@ -15,6 +18,7 @@
"TypeTransformer",
"HttpSelector",
"DpathExtractor",
"DpathEnhancingExtractor",
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
SERVICE_KEY_PREFIX,
add_service_key,
is_service_key,
)

# The name of the service key that holds a reference to the original root response
SERVICE_KEY_ROOT = "root"

# The name of the service key that holds a reference to the owner object
SERVICE_KEY_PARENT = "parent"


@dataclass
class DpathEnhancingExtractor(DpathExtractor):
"""
Navigate a path through the JSON structure to the records to retrieve. Extend the records with service fields
applicable to their filtering and transformation.
Like the DpathExtractor, extract records from a response by following a path of names of nested objects,
while adding specific service fields to the extracted records to facilitate the further processing.
Service fields:
root: Binds the original response body, the record was extracted from. This allows the record access any attribute
in any nested object, navigating from the root field.
parent: Binds a map of the parent object's attributes, including its "parent" service field. This way the extracted
record has access to the attributes of any object This is especially useful when the records are extracted from
nested lists.
Example:
body: {"a":1, "b":2, "c":{"d":4}}\n
path: {c}\n
record: {"d":4,"parent": { "a":1, "b":2}, "root": { "a":1, "b":2, "c":{"d":4}}}\n
access: {{ record.d }}, {{ record["parent"].a }}, {{ record["parent"].b }}, {{ record.["root"].a }}...
Example:
body: {"a":1, "b":2, "c":[{"d":4},{"e",5}]}\n
path: {c, *}\n
record 1: {"d":4, "parent":{ "a":1, "b":2}, "root":{ "a":1, "b":2, "c":[{"d":4},{"e",5}]})\n
record 2: {"e",5, "parent":{ "a":1, "b":2}, "root":{ "a":1, "b":2, "c":[{"d":4},{"e",5}]})\n
access: {{ record.d }}, {{ record["parent"].a }}, {{ record["parent"].b }}, {{ record.["root"].a }}...
Example:
body: { "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}\n
path: {c,e}\n
record: {"f":6, "parent": {"d":4, parent: { "a":1, "b":2}},"root":{ "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}}\n
access: {{ record.f }}, {{ record["parent"].d }}, {{ record["parent"]["parent"].a }},\n
{{ record["parent"]["parent"].b }},{{ record.["root"].a }}, {{ record.["root"].a.c.d }}...
Note:
The names of the service fields have a specific prefix like $ set in SERVICE_KEY_PREFIX.\n
When the result record is the body object itself, then the "parent" service field is not set (as it is None).\n
When the parent contains no attributes and no parent service field, the parent field is not bound.\n
The "root" service field is always set in the result record.
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
"""
See DpathExtractor
"""
super().__post_init__(parameters)

def update_body(self, body: Any) -> Any:
"""
In each nested object in the body add a service key "parent" to refer to the owner object.
For the root object/body the owner is None.
Example:
body = { "a":1, "b":2, "c":{"d":4}}
result = { "a":1,
"b":2,
"c":{"d":4,
parent: { "a":1, "b":2}}}
Example:
body = { "a":1, "b":2, "c":[{"d":4},{"e",5}]}
result = { "a":1,
"b":2,
"c":[{"d":4, "parent":{ "a":1, "b":2}},
{"e",5, "parent":{ "a":1, "b":2}}],
}
Example:
body = { "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}
result = { "a":1,
"b":2,
"c":{"d":4,
parent: { "a":1, "b":2},
"e":{"f":6,
"parent": {"d":4,
parent: { "a":1, "b":2}} }}}
:param body: the original response body. Not to be changed
:return: a copy of the body, where the nested objects have the "parent" service field bound to the map of the
parent object's attributes (including its "parent" service fields). This way any record that will be
extracted from the nested objects will have access to any parent's attributes still avoiding loops
in the JSON structure.
"""
return self._set_parent(body, None)

def _set_parent(self, body: Any, parent: Any) -> Any:
"""
:param body: the original response body. Not to be changed
:param parent: none or the parent object that owns/has as nested the body object
:return: a copy of the body enhanced in a subclass-specific way. None only when body is None.
"""
if isinstance(body, dict):
result: dict[str, Any] = dict()
if parent:
result = add_service_key(result, SERVICE_KEY_PARENT, parent)
attributes_only = dict(result)
attributes_only.update(
{
k: v
for k, v in body.items()
if v and not isinstance(v, dict) and not isinstance(v, list)
}
)
for k, v in body.items():
result[k] = self._set_parent(v, attributes_only)
return result
elif isinstance(body, list):
return [self._set_parent(v, parent) for v in body]
else:
return body

def update_record(self, record: Any, root: Any) -> Any:
"""
Change the extracted record in a subclass-specific way. Override in subclasses.
:param record: the original extracted record. Not to be changed. Not None.
:param root: the original body the record is extracted from.
:return: a copy of the record changed or enanced in a subclass-specific way.
"""
return add_service_key(record, SERVICE_KEY_ROOT, root)
51 changes: 39 additions & 12 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,44 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
for body in self.decoder.decode(response):
if len(self._field_path) == 0:
extracted = body
if body == {}:
# An empty/invalid JSON parsed, keep the contract
yield {}
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
root_response = body
body = self.update_body(root_response)

if len(self._field_path) == 0:
extracted = body
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
elif extracted:
yield extracted
else:
yield from []
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
for record in extracted:
yield self.update_record(record, root_response)
elif isinstance(extracted, dict):
yield self.update_record(extracted, root_response)
elif extracted:
yield extracted
else:
yield from []

def update_body(self, body: Any) -> Any:
"""
Change the original response in a subclass-specific way. Override in subclasses.
:param body: the original response body. Not to be changed
:return: a copy of the body enhanced in a subclass-specific way. None only when body is None.
"""
return body

def update_record(self, record: Any, root: Any) -> Any:
"""
Change the extracted record in a subclass-specific way. Override in subclasses.
:param record: the original extracted record. Not to be changed. Not None.
:param root: the original body the record is extracted from.
:return: a copy of the record changed or enanced in a subclass-specific way.
"""
return record
40 changes: 40 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/record_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,46 @@

import requests

# Convention:
# - The record extractors may leave service fields bound in the extracted records (mappings).
# - The names (keys) of the service fields have the value of SERVICE_KEY_PREFIX as their prefix.
# - The service fields are kept only during the record's filtering and transformation.
SERVICE_KEY_PREFIX = "$"


def add_service_key(mapping: Mapping[str, Any], key: str, value: Any) -> dict[str, Any]:
"""
:param mapping: non-null mapping
:param key: the name of the key, not including any specific prefixes
:param value: the value to bind
:return: a non-null copy of the mappibg including a new key-value pair, where the key is prefixed as service field.
"""
result = dict(mapping)
result[SERVICE_KEY_PREFIX + key] = value
return result


def exclude_service_keys(struct: Any) -> Any:
"""
:param struct: any object/JSON structure
:return: a copy of struct without any service fields at any level of nesting
"""
if isinstance(struct, dict):
return {k: exclude_service_keys(v) for k, v in struct.items() if not is_service_key(k)}
elif isinstance(struct, list):
return [exclude_service_keys(v) for v in struct]
else:
return struct


def is_service_key(key: str) -> bool:
return key.startswith(SERVICE_KEY_PREFIX)


def remove_service_keys(records: Iterable[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]:
for record in records:
yield exclude_service_keys(record)


@dataclass
class RecordExtractor:
Expand Down
9 changes: 6 additions & 3 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import requests

from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
RecordExtractor,
remove_service_keys,
)
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.type_transformer import (
TypeTransformer as DeclarativeTypeTransformer,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TypeTransformer
Expand Down Expand Up @@ -106,7 +108,8 @@ def filter_and_transform(
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
no_service_fields_data = remove_service_keys(transformed_data)
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

Expand Down
Loading

0 comments on commit 19b76fe

Please sign in to comment.