Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (CDK) (AsyncRetriever) - Use the Nested Decoders to decode the streaming responses, instead of ResponseToFileExtractor #378

Merged
Merged
25 changes: 23 additions & 2 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,36 @@ def parse(
class GzipParser(Parser):
inner_parser: Parser

def _reset_reader_pointer(self, data: BufferedIOBase) -> None:
"""
Reset the reader pointer to the beginning of the data.

Note:
- This is necessary because the gzip decompression will consume the data stream.
"""
data.seek(0)

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
"""
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)

try:
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
except gzip.BadGzipFile:
logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.")
self._reset_reader_pointer(data)
yield from self.inner_parser.parse(data)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

EMPTY_STR: str = ""
DEFAULT_ENCODING: str = "utf-8"
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10

Expand Down Expand Up @@ -136,7 +135,6 @@ def _read_with_chunks(
"""

try:
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,10 +2193,12 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A
stream_response=False if self._emit_connector_builder_messages else True,
)

@staticmethod
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder:
def create_jsonl_decoder(
self, model: JsonlDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
Expand Down Expand Up @@ -2753,7 +2755,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
)
paginator = (
self._create_component_from_model(
model=model.download_paginator, decoder=decoder, config=config, url_base=""
model=model.download_paginator,
decoder=operational_decoder,
config=config,
url_base="",
)
if model.download_paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -2782,29 +2787,29 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
parameters={},
)

decoder = (
operational_decoder = (
self._create_component_from_model(model=model.decoder, config=config)
if model.decoder
else JsonDecoder(parameters={})
)
record_selector = self._create_component_from_model(
model=model.record_selector,
config=config,
decoder=decoder,
decoder=operational_decoder,
name=name,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
)
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
creation_requester = self._create_component_from_model(
model=model.creation_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job creation - {name}",
)
polling_requester = self._create_component_from_model(
model=model.polling_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job polling - {name}",
)
Expand Down Expand Up @@ -2839,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
abort_requester = (
self._create_component_from_model(
model=model.abort_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job abort - {name}",
)
Expand All @@ -2849,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
delete_requester = (
self._create_component_from_model(
model=model.delete_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job delete - {name}",
)
Expand All @@ -2859,18 +2864,21 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
download_target_requester = (
self._create_component_from_model(
model=model.download_target_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job extract_url - {name}",
)
if model.download_target_requester
else None
)
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
model=model.status_extractor, decoder=operational_decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor, decoder=decoder, config=config, name=name
model=model.download_target_extractor,
decoder=operational_decoder,
config=config,
name=name,
)
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
Expand Down
40 changes: 33 additions & 7 deletions airbyte_cdk/sources/declarative/requesters/http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
)
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
Expand All @@ -26,7 +28,10 @@
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context
from airbyte_cdk.utils.mapping_helpers import (
combine_mappings,
get_interpolation_context,
)


@dataclass
Expand Down Expand Up @@ -155,7 +160,9 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.get_request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_headers(
Expand All @@ -166,7 +173,9 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.get_request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

# fixing request options provider types has a lot of dependencies
Expand Down Expand Up @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
return self._request_options_provider.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

@property
Expand Down Expand Up @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str:
path (str): The path to join with the base URL.

Returns:
str: The concatenated URL with the trailing slash (if any) removed.
str: The resulting joined URL.

Note:
Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
- If the path is an empty string or None, the method returns the base URL with any trailing slash removed.

Example:
1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
3) _join_url("https://example.com/api/", "") >> 'https://example.com/api'
4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
"""
return urljoin(url_base, path).rstrip("/")

# return a full-url if provided directly from interpolation context
if path == EmptyString or path is None:
return url_base.rstrip("/")

return urljoin(url_base, path)

def send_request(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ def test_send_request_stream_slice_next_page_token():
"test_trailing_slash_on_path",
"https://airbyte.io",
"/my_endpoint/",
"https://airbyte.io/my_endpoint",
"https://airbyte.io/my_endpoint/",
),
(
"test_nested_path_no_leading_slash",
Expand Down
Loading