Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(decoder): clean decoders and make csvdecoder available #326

Merged
merged 11 commits into from
Feb 12, 2025
99 changes: 19 additions & 80 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,6 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2133,43 +2132,26 @@ definitions:
$parameters:
type: object
additionalProperties: true
GzipJsonDecoder:
title: GzipJson Decoder
description: Use this if the response is Gzip compressed Json.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [GzipJsonDecoder]
encoding:
type: string
default: utf-8
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
- parser
- decoder
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
decoder:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -3002,79 +2984,38 @@ definitions:
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
additionalProperties: true
CompositeRawDecoder:
description: "(This is experimental, use at your own risk)"
GzipDecoder:
type: object
required:
- type
- parser
- inner_decoder
properties:
type:
type: string
enum: [CompositeRawDecoder]
parser:
enum: [GzipDecoder]
inner_decoder:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
GzipParser:
type: object
required:
- type
- inner_parser
properties:
type:
type: string
enum: [GzipParser]
inner_parser:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
required:
- type
properties:
type:
type: string
enum: [JsonParser]
encoding:
type: string
default: utf-8
JsonLineParser:
type: object
required:
- type
properties:
type:
type: string
enum: [JsonLineParser]
encoding:
type: string
default: utf-8
CsvParser:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
CsvDecoder:
type: object
required:
- type
properties:
type:
type: string
enum: [CsvParser]
enum: [CsvDecoder]
encoding:
type: string
default: utf-8
Expand Down Expand Up @@ -3202,24 +3143,22 @@ definitions:
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
Expand Down
4 changes: 0 additions & 4 deletions airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
)
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
IterableDecoder,
JsonDecoder,
JsonlDecoder,
)
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import (
Expand All @@ -27,9 +25,7 @@
"CompositeRawDecoder",
"JsonDecoder",
"JsonParser",
"JsonlDecoder",
"IterableDecoder",
"GzipJsonDecoder",
"NoopDecoder",
"PaginationDecoderDecorator",
"XmlDecoder",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
import gzip
import io
import json
import logging
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -130,11 +131,15 @@ class CompositeRawDecoder(Decoder):
"""

parser: Parser
stream_response: bool = True

def is_stream_response(self) -> bool:
return True
return self.stream_response

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
if self.is_stream_response():
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
69 changes: 10 additions & 59 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,39 @@
import orjson
import requests

from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, JsonParser
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")


@dataclass
class JsonDecoder(Decoder):
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""

parameters: InitVar[Mapping[str, Any]]
def __init__(self, parameters: Mapping[str, Any]):
self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False)

def is_stream_response(self) -> bool:
return False
return self._decoder.is_stream_response()

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
"""
has_yielded = False
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
for element in self._decoder.decode(response):
yield element
has_yielded = True
except Exception:
yield {}

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
if not has_yielded:
yield {}
else:
yield from body_json


@dataclass
class IterableDecoder(Decoder):
Expand All @@ -69,43 +60,3 @@ def decode(
) -> Generator[MutableMapping[str, Any], None, None]:
for line in response.iter_lines():
yield {"record": line.decode()}


@dataclass
class JsonlDecoder(Decoder):
"""
Decoder strategy that returns the json-encoded content of the response, if any.
"""

parameters: InitVar[Mapping[str, Any]]

def is_stream_response(self) -> bool:
return True

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
# TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional?
# https://github.com/airbytehq/airbyte-internal-issues/issues/8436
for record in response.iter_lines():
yield orjson.loads(record)


@dataclass
class GzipJsonDecoder(JsonDecoder):
encoding: Optional[str]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if self.encoding:
try:
codecs.lookup(self.encoding)
except LookupError:
raise ValueError(
f"Invalid encoding '{self.encoding}'. Please check provided encoding"
)

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8")
yield from self.parse_body_json(orjson.loads(raw_string))
Loading
Loading