From 546a75332897d5d828bc3127ebc0026a8698b908 Mon Sep 17 00:00:00 2001 From: Daniel Mateus Pires Date: Thu, 27 May 2021 13:01:13 +0100 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=91=A8=E2=80=8D=F0=9F=94=AC=20Wrap=20?= =?UTF-8?q?'list=20of=20lists'=20responses=20into=20'data'=20records?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../source_http_request/source.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-http-request/source_http_request/source.py b/airbyte-integrations/connectors/source-http-request/source_http_request/source.py index 27031d7277c7b..9d6bfd0babe6f 100644 --- a/airbyte-integrations/connectors/source-http-request/source_http_request/source.py +++ b/airbyte-integrations/connectors/source-http-request/source_http_request/source.py @@ -62,10 +62,20 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: } # json body will be returned as the "data" stream". we can't know its schema ahead of time, so we assume it's object (i.e. valid json). - return AirbyteCatalog(streams=[AirbyteStream(name=SourceHttpRequest.STREAM_NAME, json_schema=json_schema)]) + return AirbyteCatalog( + streams=[ + AirbyteStream( + name=SourceHttpRequest.STREAM_NAME, json_schema=json_schema + ) + ] + ) def read( - self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] + self, + logger: AirbyteLogger, + config: json, + catalog: ConfiguredAirbyteCatalog, + state: Dict[str, any], ) -> Generator[AirbyteMessage, None, None]: r = self._make_request(config) if r.status_code != 200: @@ -73,6 +83,11 @@ def read( # need to eagerly fetch the json. data = r.json() + + if isinstance(data, list) and isinstance(data[0], list): + # found list of list + data = [{"data": records} for records in data] + if not isinstance(data, list): data = [data] @@ -80,7 +95,9 @@ def read( yield AirbyteMessage( type=Type.RECORD, record=AirbyteRecordMessage( - stream=SourceHttpRequest.STREAM_NAME, data=record, emitted_at=int(datetime.now().timestamp()) * 1000 + stream=SourceHttpRequest.STREAM_NAME, + data=record, + emitted_at=int(datetime.now().timestamp()) * 1000, ), ) From ce15e936eab8d1a63e48b6617f670f266dbea6c0 Mon Sep 17 00:00:00 2001 From: Daniel Mateus Pires Date: Thu, 27 May 2021 13:01:58 +0100 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=94=A8=20Add=20test=20reproducing=20i?= =?UTF-8?q?ssue=20with=20list=20of=20lists?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../unit_tests/unit_test.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/airbyte-integrations/connectors/source-http-request/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-http-request/unit_tests/unit_test.py index 100effdc47931..a2b19f6791a54 100644 --- a/airbyte-integrations/connectors/source-http-request/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-http-request/unit_tests/unit_test.py @@ -25,9 +25,20 @@ import unittest +from unittest.mock import patch + from source_http_request import SourceHttpRequest +class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + + class TestSourceHttpRequest(unittest.TestCase): def test_parse_config(self): config = { @@ -46,3 +57,35 @@ def test_parse_config(self): "body": {"something": "good"}, } self.assertEqual(expected, actual) + + def test_json_array_response(self): + with patch.object( + SourceHttpRequest, + attribute="_make_request", + return_value=MockResponse( + json_data=[ + ["foo", "bar"], + ["test", 10], + ["test2", 15], + ], + status_code=200, + ), + ): + expected = [ + {"data": ["foo", "bar"]}, + {"data": ["test", "10"]}, + {"data": ["test2", "15"]}, + ] + source = SourceHttpRequest() + results = [ + r.record.data + for r in list( + source.read( + logger=None, + state=None, + catalog=None, + config={}, + ) + ) + ] + self.assertEqual(expected, results)