Skip to content

Commit

Permalink
#12003 source smartsheets: implement incremental read + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Apr 15, 2022
1 parent 5a3165d commit c73aaff
Show file tree
Hide file tree
Showing 15 changed files with 734 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ COPY $CODE_PATH ./$CODE_PATH
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-smartsheets
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: yes
exact_order: yes
extra_records: no
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"aws_s3_sample": {
"modifiedAt": "2222-03-07T11:30:00+00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"gender": { "type": "string" },
"ip_address": { "type": "string" },
"primary_email": { "type": "string" },
"dob": { "type": "string", "format": "date" }
"dob": { "type": "string", "format": "date" },
"modifiedAt": { "type": "string", "format": "date-time" }
}
},
"supported_sync_modes": ["full_refresh"]
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from functools import cached_property
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import smartsheet


class SmartSheetAPIWrapper:
def __init__(self, config: Mapping[str, Any]):
self._spreadsheet_id = config["spreadsheet_id"]
self._access_token = config["access_token"]
api_client = smartsheet.Smartsheet(self._access_token)
api_client.errors_as_exceptions(True)
# each call to `Sheets` makes a new instance, so we save it here to make no more new objects
self._get_sheet = api_client.Sheets.get_sheet
self._data = None

def _fetch_sheet(self, from_dt: Optional[str] = None) -> None:
kwargs = {"rows_modified_since": from_dt}
if not from_dt:
kwargs["page_size"] = 1
self._data = self._get_sheet(self._spreadsheet_id, **kwargs)

@staticmethod
def _column_to_property(column_type: str) -> Dict[str, any]:
type_mapping = {
"TEXT_NUMBER": {"type": "string"},
"DATE": {"type": "string", "format": "date"},
"DATETIME": {"type": "string", "format": "date-time"},
}
return type_mapping.get(column_type, {"type": "string"})

def _construct_record(self, row: smartsheet.models.Row) -> Dict[str, str]:
values_column_map = {cell.column_id: str(cell.value or "") for cell in row.cells}
record = {column.title: values_column_map[column.id] for column in self.data.columns}
record["modifiedAt"] = row.modified_at.isoformat()
return record

@property
def data(self) -> smartsheet.models.Row:
if not self._data:
self._fetch_sheet()
return self._data

@property
def name(self) -> str:
return self.data.name

@property
def row_count(self) -> int:
return len(self.data.rows)

@cached_property
def primary_key(self) -> str:
for column in self.data.columns:
if column.primary:
return column.title

@cached_property
def json_schema(self) -> Dict[str, Any]:
column_info = {column.title: self._column_to_property(column.type.value) for column in self.data.columns}
column_info["modifiedAt"] = {"type": "string", "format": "date-time"} # add cursor field explicitly
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": column_info,
}
return json_schema

def read_records(self, from_dt: str) -> Iterable[Dict[str, str]]:
self._fetch_sheet(from_dt)
for row in self.data.rows:
yield self._construct_record(row)

def check_connection(self, logger: logging.Logger) -> Tuple[bool, str]:
try:
_ = self.data
except smartsheet.exceptions.ApiError as e:
err = e.error.result
code = 404 if err.code == 1006 else err.code
reason = f"{err.name}: {code} - {err.message} | Check your spreadsheet ID."
logger.error(reason)
return False, reason
except Exception as e:
reason = str(e)
logger.error(reason)
return False, reason
return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,21 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, List, Mapping, Tuple

import json
from datetime import datetime
from typing import Dict, Generator, List
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream

import smartsheet
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
Type,
)
from airbyte_cdk.sources import Source
from .sheet import SmartSheetAPIWrapper
from .streams import SmartsheetStream


def get_prop(col_type: str) -> Dict[str, any]:
props = {
"TEXT_NUMBER": {"type": "string"},
"DATE": {"type": "string", "format": "date"},
"DATETIME": {"type": "string", "format": "date-time"},
}
return props.get(col_type, {"type": "string"})
class SourceSmartsheets(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
sheet = SmartSheetAPIWrapper(config)
return sheet.check_connection(logger)


def construct_record(sheet_columns: List[Dict], row_cells: List[Dict]) -> Dict:
# convert all data to string as it is only expected format in schema
values_column_map = {cell["columnId"]: str(cell.get("value", "")) for cell in row_cells}
return {column["title"]: values_column_map[column["id"]] for column in sheet_columns}


def get_json_schema(sheet_columns: List[Dict]) -> Dict:
column_info = {column["title"]: get_prop(column["type"]) for column in sheet_columns}
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": column_info,
}
return json_schema


class SourceSmartsheets(Source):
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
try:
access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]

smartsheet_client = smartsheet.Smartsheet(access_token)
smartsheet_client.errors_as_exceptions(True)
smartsheet_client.Sheets.get_sheet(spreadsheet_id)

return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
if isinstance(e, smartsheet.exceptions.ApiError):
err = e.error.result
code = 404 if err.code == 1006 else err.code
reason = f"{err.name}: {code} - {err.message} | Check your spreadsheet ID."
else:
reason = str(e)
logger.error(reason)
return AirbyteConnectionStatus(status=Status.FAILED)

def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]
streams = []

smartsheet_client = smartsheet.Smartsheet(access_token)
try:
sheet = smartsheet_client.Sheets.get_sheet(spreadsheet_id)
sheet = json.loads(str(sheet)) # make it subscriptable
sheet_json_schema = get_json_schema(sheet["columns"])
logger.info(f"Running discovery on sheet: {sheet['name']} with {spreadsheet_id}")

stream = AirbyteStream(name=sheet["name"], json_schema=sheet_json_schema)
stream.supported_sync_modes = ["full_refresh"]
streams.append(stream)

except Exception as e:
raise Exception(f"Could not run discovery: {str(e)}")

return AirbyteCatalog(streams=streams)

def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:

access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]
smartsheet_client = smartsheet.Smartsheet(access_token)

for configured_stream in catalog.streams:
stream = configured_stream.stream
try:
sheet = smartsheet_client.Sheets.get_sheet(spreadsheet_id)
sheet = json.loads(str(sheet)) # make it subscriptable
logger.info(f"Starting syncing spreadsheet {sheet['name']}")
logger.info(f"Row count: {sheet['totalRowCount']}")

for row in sheet["rows"]:
try:
record = construct_record(sheet["columns"], row["cells"])
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=stream.name, data=record, emitted_at=int(datetime.now().timestamp()) * 1000),
)
except Exception as e:
logger.error(f"Unable to encode row into an AirbyteMessage with the following error: {e}")

except Exception as e:
logger.error(f"Could not read smartsheet: {stream.name}")
raise e
logger.info(f"Finished syncing spreadsheet with ID: {spreadsheet_id}")
def streams(self, config: Mapping[str, Any]) -> List["Stream"]:
sheet = SmartSheetAPIWrapper(config)
return [SmartsheetStream(sheet, config)]
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@
"title": "Sheet ID",
"description": "The spreadsheet ID. Find in the spreadsheet menu: File > Properties",
"type": "string"
},
"start_datetime": {
"title": "Start Date",
"type": ["null", "string"],
"examples": [
"2000-01-01",
"2000-01-01 13:00",
"2000-01-01 13:00:00",
"2000-01-01T13:00+00:00",
"2000-01-01T13:00:00-07:00"
],
"description": "ISO 8601, for instance: `YYYY-MM-DD`, `YYYY-MM-DD HH:MM:SS+HH:MM`",
"format": "date-time"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import datetime
from typing import Any, Dict, Iterable, List, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from source_smartsheets.sheet import SmartSheetAPIWrapper


class SmartsheetStream(Stream):
cursor_field = "modifiedAt"

def __init__(self, smartsheet: SmartSheetAPIWrapper, config: Mapping[str, Any]):
self.smartsheet = smartsheet
self._state = {}
self._config = config
self._start_datetime = self._config.get("start_datetime") or "1970-01-01T00:00:00+00:00"

@property
def primary_key(self) -> str:
return self.smartsheet.primary_key

def get_json_schema(self) -> Dict[str, Any]:
return self.smartsheet.json_schema

@property
def name(self) -> str:
return self.smartsheet.name

@property
def state(self) -> Mapping[str, Any]:
if not self._state:
self._state = {self.cursor_field: self._start_datetime}
return self._state

@state.setter
def state(self, value: Mapping[str, Any]):
self._state = value

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
def iso_dt(src):
return datetime.datetime.fromisoformat(src)

for record in self.smartsheet.read_records(self.state[self.cursor_field]):
current_cursor_value = iso_dt(self.state[self.cursor_field])
latest_cursor_value = iso_dt(record[self.cursor_field])
new_cursor_value = max(latest_cursor_value, current_cursor_value)
self.state = {self.cursor_field: new_cursor_value.isoformat("T", "seconds")}
yield record
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
from pathlib import Path
from unittest.mock import Mock

import pytest
from smartsheet.models import Sheet

HERE = Path(__file__).parent.absolute()


@pytest.fixture
def response_mock():
with open(HERE / "response.json") as json_file:
return json.loads(json_file.read())


@pytest.fixture
def config():
return {"spreadsheet_id": "id", "access_token": "token"}


@pytest.fixture
def get_sheet_mocker(mocker, response_mock):
def _mocker(api_wrapper, data=None):
sheet_obj = Sheet(props=response_mock, base_obj=api_wrapper)
get_sheet_mock = Mock(return_value=sheet_obj)
mocker.patch.object(api_wrapper, "_get_sheet", data or get_sheet_mock)
return get_sheet_mock, sheet_obj

return _mocker
Loading

1 comment on commit c73aaff

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Source Smartsheets(#12077)

Measures

Name Value Name Value Name Value
Reliability Rating A Vulnerabilities 0 Lines of Code 132
Duplicated Blocks 0 Security Rating A Quality Gate Status OK
Lines to Cover 118 Duplicated Lines (%) 0.0 Coverage 96.6
Code Smells 9 Bugs 0 Blocker Issues 0
Critical Issues 0 Major Issues 0 Minor Issues 9

Detected Issues

Rule File Description Message
python:mypy_assignment (MINOR) source_smartsheets/sheet.py:25 Check that assigned value is compatible with target Incompatible types in assignment (expression has type "int", target has type "Optional[str]") . Code line: kwargs["page_size"] = 1
python:mypy_valid_type (MINOR) source_smartsheets/sheet.py:29 Check that type (annotation) is valid Function "builtins.any" is not valid as a type . Code line: def _column_to_property(column_type: str) -> Dict[str, any]:
python:mypy_no_any_return (MINOR) source_smartsheets/sheet.py:51 Reject returning value with "Any" type if return type is not "Any" Returning Any from function declared to return "str" . Code line: return self.data.name
python:mypy_return (MINOR) source_smartsheets/sheet.py:58 Check that function always returns a value Missing return statement . Code line: def primary_key(self) -> str:
python:mypy_no_any_return (MINOR) source_smartsheets/sheet.py:61 Reject returning value with "Any" type if return type is not "Any" Returning Any from function declared to return "str" . Code line: return column.title
python:mypy_return_value (MINOR) source_smartsheets/sheet.py:92 Check that return value is compatible with signature Incompatible return value type (got "Tuple[bool, None]", expected "Tuple[bool, str]") . Code line: return True, None
python:mypy_valid_type (MINOR) source_smartsheets/source.py:16 Check that type (annotation) is valid Function "builtins.any" is not valid as a type . Code line: ...ogger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
python:mypy_var_annotated (MINOR) source_smartsheets/streams.py:18 Require variable annotation if type can't be inferred Need type annotation for "_state" (hint: "_state: Dict[, ] = ...") . Code line: self._state = {}
python:mypy_assignment (MINOR) source_smartsheets/streams.py:41 Check that assigned value is compatible with target Incompatible types in assignment (expression has type "Mapping[str, Any]", variable has type "Dict[Any, Any]") . Code line: self._state = value

Coverage (96.6%)

File Coverage File Coverage
source_smartsheets/init.py 100.0 source_smartsheets/sheet.py 100.0
source_smartsheets/source.py 100.0 source_smartsheets/streams.py 89.2

Please sign in to comment.