Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent raw decoder from closing before sync completion #366

Merged
merged 12 commits into from
Mar 6, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
24 changes: 24 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import csv
import gzip
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO, StringIO
from threading import Thread
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -202,6 +204,28 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


class TestServer(BaseHTTPRequestHandler):

def do_GET(self) -> None:
self.send_response(200)
self.end_headers()
self.wfile.write(bytes("col1,col2\nval1,val2", 'utf-8'))


def test_composite_raw_decoder_csv_parser():
# start server
httpd = HTTPServer(('localhost', 8080), TestServer)
thread = Thread(target=httpd.serve_forever, args = ())
thread.start()

response = requests.get("http://localhost:8080", stream=True)
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))

assert len(result) == 1
httpd.shutdown() # release port
thread.join()


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down
Loading