Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent raw decoder from closing before sync completion #366

Merged
merged 12 commits into from
Mar 6, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
42 changes: 42 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import csv
import gzip
import json
import socket
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO, StringIO
from threading import Thread
from unittest.mock import patch

import pytest
Expand All @@ -20,6 +23,12 @@
from airbyte_cdk.utils import AirbyteTracedException


def find_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', 0))
return s.getsockname()[1] # type: ignore # this should return a int


def compress_with_gzip(data: str, encoding: str = "utf-8"):
"""
Compress the data using Gzip.
Expand Down Expand Up @@ -202,6 +211,39 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


class TestServer(BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.end_headers()
self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))


def test_composite_raw_decoder_csv_parser_without_mocked_response():
"""
This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests.

We first identified this issue while working with the sample defined https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv.
This should be reproducible by having the test server return the `self.wfile.write` statement as a comment below but it does not. However, it wasn't reproducible.

Currently we use `self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))` to reproduce which we know is not a valid csv as it does not end with a newline character. However, this is the only we were able to reproduce locally.
"""
# self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8"))

# start server
port = find_available_port()
httpd = HTTPServer(("localhost", port), TestServer)
thread = Thread(target=httpd.serve_forever, args=())
thread.start()
try:
response = requests.get(f"http://localhost:{port}", stream=True)
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))

assert len(result) == 1
finally:
httpd.shutdown() # release port and kill the thread
thread.join(timeout=5) # ensure thread is cleaned up


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down
Loading