Skip to content

Commit

Permalink
Merged and updated imports
Browse files Browse the repository at this point in the history
  • Loading branch information
rpopov committed Mar 5, 2025
1 parent a0cadf5 commit 96e2a59
Show file tree
Hide file tree
Showing 5 changed files with 877 additions and 822 deletions.
8 changes: 3 additions & 5 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import codecs
# import codecs
import logging
from dataclasses import InitVar, dataclass
from gzip import decompress
from typing import Any, Generator, List, Mapping, MutableMapping, Optional

import orjson
import requests

from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, JsonParser
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")
Expand All @@ -35,15 +34,14 @@ def decode(
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
"""
try:
yield self._decoder.decode(response)
yield from self._decoder.decode(response)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}



@dataclass
class IterableDecoder(Decoder):
"""
Expand Down
7 changes: 3 additions & 4 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def filter_and_transform(
Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
share the logic of doing transformations on a set of records.
"""

if self.transform_before_filtering:
transformed_data = self._transform(all_data, stream_state, stream_slice)
transformed_filtered_data = self._filter(
Expand All @@ -117,10 +118,8 @@ def filter_and_transform(
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice)

no_service_fields_data = remove_service_keys(transformed_data)
normalized_data = self._normalize_by_schema(
no_service_fields_data, schema=records_schema
)
no_service_fields_data = remove_service_keys(transformed_filtered_data)
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

Expand Down
Loading

0 comments on commit 96e2a59

Please sign in to comment.