Skip to content

Commit fe2f9a5

Browse files
ChristoGrabmaxi297octavia-squidington-iiicoderabbitai[bot]
authored
fix: prevent raw decoder from closing before sync completion (#366)
Co-authored-by: maxi297 <maxime@airbyte.io> Co-authored-by: octavia-squidington-iii <contact@airbyte.com> Co-authored-by: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 69ba54d commit fe2f9a5

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

+4
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ def decode(
151151
self, response: requests.Response
152152
) -> Generator[MutableMapping[str, Any], None, None]:
153153
if self.is_stream_response():
154+
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
155+
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
156+
response.raw.auto_close = False
154157
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
158+
response.raw.close()
155159
else:
156160
yield from self.parser.parse(data=io.BytesIO(response.content))

unit_tests/sources/declarative/decoders/test_composite_decoder.py

+42
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
import csv
55
import gzip
66
import json
7+
import socket
8+
from http.server import BaseHTTPRequestHandler, HTTPServer
79
from io import BytesIO, StringIO
10+
from threading import Thread
811
from unittest.mock import patch
912

1013
import pytest
@@ -20,6 +23,12 @@
2023
from airbyte_cdk.utils import AirbyteTracedException
2124

2225

26+
def find_available_port() -> int:
27+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
28+
s.bind(("localhost", 0))
29+
return s.getsockname()[1] # type: ignore # this should return a int
30+
31+
2332
def compress_with_gzip(data: str, encoding: str = "utf-8"):
2433
"""
2534
Compress the data using Gzip.
@@ -202,6 +211,39 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
202211
assert parsed_records == expected_data
203212

204213

214+
class TestServer(BaseHTTPRequestHandler):
215+
def do_GET(self) -> None:
216+
self.send_response(200)
217+
self.end_headers()
218+
self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))
219+
220+
221+
def test_composite_raw_decoder_csv_parser_without_mocked_response():
222+
"""
223+
This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests.
224+
225+
We first identified this issue while working with the sample defined https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv.
226+
This should be reproducible by having the test server return the `self.wfile.write` statement as a comment below but it does not. However, it wasn't reproducible.
227+
228+
Currently we use `self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))` to reproduce which we know is not a valid csv as it does not end with a newline character. However, this is the only we were able to reproduce locally.
229+
"""
230+
# self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8"))
231+
232+
# start server
233+
port = find_available_port()
234+
httpd = HTTPServer(("localhost", port), TestServer)
235+
thread = Thread(target=httpd.serve_forever, args=())
236+
thread.start()
237+
try:
238+
response = requests.get(f"http://localhost:{port}", stream=True)
239+
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))
240+
241+
assert len(result) == 1
242+
finally:
243+
httpd.shutdown() # release port and kill the thread
244+
thread.join(timeout=5) # ensure thread is cleaned up
245+
246+
205247
def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
206248
requests_mock.register_uri(
207249
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()

0 commit comments

Comments
 (0)