Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Netsuite: Determine request date format #17470

Closed
wants to merge 13 commits into from
Closed
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-netsuite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_netsuite ./source_netsuite
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-netsuite
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
primary_key = "id"
raise_on_http_errors = True
output_datetime_format = "%Y-%m-%dT%H:%M:%SZ"
input_datetime_format = "%m/%d/%Y"
input_datetime_format = None

@property
def name(self) -> str:
Expand Down Expand Up @@ -209,9 +209,9 @@ def request_params(
) -> MutableMapping[str, Any]:
params = {**(next_page_token or {})}
if stream_slice:
params.update(
**{"q": f'{self.cursor_field} AFTER "{stream_slice["start"]}" AND {self.cursor_field} BEFORE "{stream_slice["end"]}"'}
)
start = stream_slice["start"].strftime(self.input_datetime_format)
end = stream_slice["end"].strftime(self.input_datetime_format)
params.update(**{"q": f'{self.cursor_field} AFTER "{start}" AND {self.cursor_field} BEFORE "{end}"'})
return params

def stream_slices(
Expand All @@ -230,15 +230,49 @@ def stream_slices(
else:
while start <= date.today():
next_day = start + timedelta(days=self.window_in_days)
slices.append(
{
"start": start.strftime(self.input_datetime_format),
"end": next_day.strftime(self.input_datetime_format),
}
)
slices.append({"start": start, "end": next_day})
start = next_day
return slices

def read_records(
self,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
**kwargs,
) -> Iterable[Mapping[str, Any]]:
if self.input_datetime_format is None:
self._determine_input_datetime_format(stream_state=stream_state, stream_slice=stream_slice)
yield from super().read_records(stream_slice=stream_slice, stream_state=stream_state, **kwargs)

def _determine_input_datetime_format(
self,
stream_slice: Mapping[str, Any],
stream_state: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:
self.input_datetime_format = "%m/%d/%Y"

stream_state = stream_state or {}
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice)
request_params = self.request_params(stream_state=stream_state, stream_slice=stream_slice)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=dict(request_params, **{"limit": 1}),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice)
response = self._send(request, request_kwargs)

if response.status_code == 400:
message = response.json().get("o:errorDetails")
if isinstance(message, list):
error_code = message[0].get("o:errorCode")
detail_message = message[0].get("detail")
if 'Unparseable date:' in detail_message and "INVALID_PARAMETER" == error_code:
self.logger.debug('input date format changing')
self.input_datetime_format = "%Y-%m-%d"


class CustomIncrementalNetsuiteStream(IncrementalNetsuiteStream):
@property
Expand Down