Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (CDK) (AsyncRetriever) - Use the Nested Decoders to decode the streaming responses, instead of ResponseToFileExtractor #378

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,10 @@ definitions:
type:
type: string
enum: [ResponseToFileExtractor]
file_type:
title: The file type in which the response data is storred. Supported types are [csv, jsonl].
type: string
default: csv
$parameters:
type: object
additionalProperties: true
Expand Down
193 changes: 143 additions & 50 deletions airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import zlib
from contextlib import closing
from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import pandas as pd
Expand All @@ -20,28 +21,65 @@
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10


class FileTypes(Enum):
CSV = "csv"
JSONL = "jsonl"


@dataclass
class ResponseToFileExtractor(RecordExtractor):
"""
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
a tradeoff.
This class is used when having very big HTTP responses (usually streamed),
which would require too much memory so we use disk space as a tradeoff.

The extractor does the following:
1) Save the response to a temporary file
2) Read from the temporary file by chunks to avoid OOM
3) Remove the temporary file after reading
4) Return the records
5) If the response is not compressed, it will be filtered for null bytes
6) If the response is compressed, it will be decompressed
7) If the response is compressed and contains null bytes, it will be filtered for null bytes

Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
"""

parameters: InitVar[Mapping[str, Any]]
file_type: Optional[str] = "csv"

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.logger = logging.getLogger("airbyte")

def extract_records(
self, response: Optional[requests.Response] = None
) -> Iterable[Mapping[str, Any]]:
"""
Extracts records from the given response by:
1) Saving the result to a tmp file
2) Reading from saved file by chunks to avoid OOM

Args:
response (Optional[requests.Response]): The response object containing the data. Defaults to None.

Yields:
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.

Returns:
None
"""
if response:
file_path, encoding = self._save_to_file(response)
yield from self._read_with_chunks(file_path, encoding)
else:
yield from []

def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
"""
Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library
implementation.

Args:
headers (Dict[str, Any]): The headers of the response.

Returns:
str: The encoding of the response.
"""
Expand Down Expand Up @@ -73,11 +111,28 @@ def _filter_null_bytes(self, b: bytes) -> bytes:

res = b.replace(b"\x00", b"")
if len(res) < len(b):
self.logger.warning(
"Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)
)
message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars"
self.logger.warning(message, len(b), len(res))
return res

def _get_file_path(self) -> str:
"""
Get a temporary file path with a unique name.

Returns:
str: The path to the temporary file.

Raises:
ValueError: If the file type is not supported.
"""

if self.file_type not in [file_type.value for file_type in FileTypes]:
raise ValueError(
f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.",
)

return str(uuid.uuid4()) + "." + self.file_type

def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
"""
Saves the binary data from the given response to a temporary file and returns the filepath and response encoding.
Expand All @@ -95,8 +150,9 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32)
needs_decompression = True # we will assume at first that the response is compressed and change the flag if not

tmp_file = str(uuid.uuid4())
with closing(response) as response, open(tmp_file, "wb") as data_file:
file_path = self._get_file_path()
# save binary data to tmp file
with closing(response) as response, open(file_path, "wb") as data_file:
response_encoding = self._get_response_encoding(dict(response.headers or {}))
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
try:
Expand All @@ -110,15 +166,76 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
needs_decompression = False

# check the file exists
if os.path.isfile(tmp_file):
return tmp_file, response_encoding
if os.path.isfile(file_path):
return file_path, response_encoding
else:
raise ValueError(
f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist."
)
message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data."
raise ValueError(f"{message} Tmp file {file_path} doesn't exist.")

def _read_csv(
self,
path: str,
file_encoding: str,
chunk_size: int = 100,
) -> Iterable[Mapping[str, Any]]:
"""
Reads a CSV file and yields each row as a dictionary.

Args:
path (str): The path to the CSV file to be read.
file_encoding (str): The encoding of the file.

Yields:
Mapping[str, Any]: A dictionary representing each row of data.
"""

csv_read_params = {
"chunksize": chunk_size,
"iterator": True,
"dialect": "unix",
"dtype": object,
"encoding": file_encoding,
}

for chunk in pd.read_csv(path, **csv_read_params): # type: ignore # ignoring how args are passed
# replace NaN with None
chunk = chunk.replace({nan: None}).to_dict(orient="records")
for record in chunk:
yield record

def _read_json_lines(
self,
path: str,
file_encoding: str,
chunk_size: int = 100,
) -> Iterable[Mapping[str, Any]]:
"""
Reads a JSON file and yields each row as a dictionary.

Args:
path (str): The path to the JSON file to be read.
file_encoding (str): The encoding of the file.

Yields:
Mapping[str, Any]: A dictionary representing each row of data.
"""

json_read_params = {
"lines": True,
"chunksize": chunk_size,
"encoding": file_encoding,
"convert_dates": False,
}

for chunk in pd.read_json(path, **json_read_params): # type: ignore # ignoring how args are passed
for record in chunk.to_dict(orient="records"):
yield record

def _read_with_chunks(
self, path: str, file_encoding: str, chunk_size: int = 100
self,
path: str,
file_encoding: str,
chunk_size: int = 100,
) -> Iterable[Mapping[str, Any]]:
"""
Reads data from a file in chunks and yields each row as a dictionary.
Expand All @@ -132,47 +249,23 @@ def _read_with_chunks(
Mapping[str, Any]: A dictionary representing each row of data.

Raises:
ValueError: If an IO/Error occurs while reading the temporary data.
ValueError: If an error occurs while reading the data from the file.
"""

try:
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
)
for chunk in chunks:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
for row in chunk:
yield row
if self.file_type == FileTypes.CSV.value:
yield from self._read_csv(path, file_encoding, chunk_size)

if self.file_type == FileTypes.JSONL.value:
yield from self._read_json_lines(path, file_encoding, chunk_size)

except pd.errors.EmptyDataError as e:
self.logger.info(f"Empty data received. {e}")
message = "ResponseToFileExtractor._read_with_chunks(): Empty data received."
self.logger.info(f"{message} {e}")
yield from []
except IOError as ioe:
raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe)
message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file."
raise ValueError(f"{message} Called: {path}", ioe)
finally:
# remove binary tmp file, after data is read
os.remove(path)

def extract_records(
self, response: Optional[requests.Response] = None
) -> Iterable[Mapping[str, Any]]:
"""
Extracts records from the given response by:
1) Saving the result to a tmp file
2) Reading from saved file by chunks to avoid OOM

Args:
response (Optional[requests.Response]): The response object containing the data. Defaults to None.

Yields:
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.

Returns:
None
"""
if response:
file_path, encoding = self._save_to_file(response)
yield from self._read_with_chunks(file_path, encoding)
else:
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ class DpathExtractor(BaseModel):

class ResponseToFileExtractor(BaseModel):
type: Literal["ResponseToFileExtractor"]
file_type: Optional[str] = Field(
"csv",
title="The file type in which the response data is storred. Supported types are [csv, jsonl].",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1992,7 +1992,9 @@ def create_response_to_file_extractor(
model: ResponseToFileExtractorModel,
**kwargs: Any,
) -> ResponseToFileExtractor:
return ResponseToFileExtractor(parameters=model.parameters or {})
return ResponseToFileExtractor(
parameters=model.parameters or {}, file_type=model.file_type or "csv"
)

@staticmethod
def create_exponential_backoff_strategy(
Expand Down
Loading