Skip to content

Commit

Permalink
Source Amazon Seller Partner: Add GET_SELLER_FEEDBACK_DATA report (#8021
Browse files Browse the repository at this point in the history
)

* Source Amazon Seller Partner: Add GET_SELLER_FEEDBACK_DATA report

* no field definition here

* add dataStartTime, dataEndTime

* fixes, version bumps

* revert this

* cleanup

* fixes

* real world wait

* fix typo

* incremental, transformer

* fix stream schema

* implement incremental with slicing

* move

* cleanup

* update configured catalog

* fix tests

* changelog

* doc

* fix doc

* fix configured catalog

* Field definitions
  • Loading branch information
lizdeika authored Dec 2, 2021
1 parent 1eba280 commit d7ef91f
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e55879a8-0ef8-4557-abcf-ab34c53ec460",
"name": "Amazon Seller Partner",
"dockerRepository": "airbyte/source-amazon-seller-partner",
"dockerImageTag": "0.2.3",
"dockerImageTag": "0.2.4",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/amazon-seller-partner",
"icon": "amazonsellerpartner.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
- name: Amazon Seller Partner
sourceDefinitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerRepository: airbyte/source-amazon-seller-partner
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
sourceType: api
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-seller-partner
icon: amazonsellerpartner.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.3"
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
changelogUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.name=airbyte/source-amazon-seller-partner
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "GET_SELLER_FEEDBACK_DATA",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["Date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["Date"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@
},
"GET_VENDOR_INVENTORY_HEALTH_REPORT": {
"createdTime": "2021-07-01T00:00:00Z"
},
"GET_SELLER_FEEDBACK_DATA": {
"createdTime": "2021-07-01T00:00:00Z"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"title": "Seller Feedback Data Reports",
"description": "Seller Feedback Data Reports",
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"Date": {
"type": ["null", "string"],
"format": "date"
},
"Rating": {
"type": ["null", "number"]
},
"Comments": {
"type": ["null", "string"]
},
"Response": {
"type": ["null", "string"]
},
"Order ID": {
"type": ["null", "string"]
},
"Rater Email": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Orders,
VendorDirectFulfillmentShipping,
VendorInventoryHealthReports,
SellerFeedbackReports,
)


Expand All @@ -37,6 +38,11 @@ class Config:
pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
examples=["2017-01-25T00:00:00Z"],
)
period_in_days: int = Field(
30,
description="Will be used for stream slicing for initial full_refresh sync when no updated state is present for reports that support sliced incremental sync.",
examples=["30", "365"]
)
refresh_token: str = Field(
description="The refresh token used obtained via authorization (can be passed to the client instead)", airbyte_secret=True
)
Expand Down Expand Up @@ -78,6 +84,7 @@ def _get_stream_kwargs(self, config: ConnectorConfig) -> Mapping[str, Any]:
"aws_signature": aws_signature,
"replication_start_date": config.replication_start_date,
"marketplace_ids": [marketplace_id],
"period_in_days": config.period_in_days
}
return stream_kwargs

Expand Down Expand Up @@ -115,6 +122,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
VendorDirectFulfillmentShipping(**stream_kwargs),
VendorInventoryHealthReports(**stream_kwargs),
Orders(**stream_kwargs),
SellerFeedbackReports(**stream_kwargs),
]

def spec(self, *args, **kwargs) -> ConnectorSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
"examples": ["2017-01-25T00:00:00Z"],
"type": "string"
},
"period_in_days": {
"title": "Period In Days",
"description": "Will be used for stream slicing for initial full_refresh sync when no updated state is present for reports that support sliced incremental sync.",
"examples": ["30", "365"],
"type": "integer",
"default": 30
},
"refresh_token": {
"title": "Refresh Token",
"description": "The refresh token used obtained via authorization (can be passed to the client instead)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,37 @@
import zlib
from abc import ABC, abstractmethod
from io import StringIO
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException
from airbyte_cdk.sources.streams.http.http import BODY_REQUEST_METHODS
from airbyte_cdk.sources.streams.http.rate_limiting import default_backoff_handler
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from Crypto.Cipher import AES
from source_amazon_seller_partner.auth import AWSSignature

REPORTS_API_VERSION = "2020-09-04"
ORDERS_API_VERSION = "v0"
VENDORS_API_VERSION = "v1"

REPORTS_MAX_WAIT_SECONDS = 50
# 33min. taken from real world experience working with amazon seller partner reports
REPORTS_MAX_WAIT_SECONDS = 1980

DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"


class AmazonSPStream(HttpStream, ABC):
data_field = "payload"

def __init__(
self, url_base: str, aws_signature: AWSSignature, replication_start_date: str, marketplace_ids: List[str], *args, **kwargs
self, url_base: str, aws_signature: AWSSignature, replication_start_date: str, marketplace_ids: List[str], period_in_days: Optional[int], *args, **kwargs
):
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -137,6 +142,7 @@ def __init__(
aws_signature: AWSSignature,
replication_start_date: str,
marketplace_ids: List[str],
period_in_days: Optional[int],
authenticator: HttpAuthenticator = NoAuth(),
):
self._authenticator = authenticator
Expand All @@ -145,6 +151,7 @@ def __init__(
self._session.auth = aws_signature
self._replication_start_date = replication_start_date
self.marketplace_ids = marketplace_ids
self.period_in_days = period_in_days

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -194,14 +201,30 @@ def _create_prepared_request(

return self._session.prepare_request(requests.Request(**args))

def _create_report(self) -> Mapping[str, Any]:
request_headers = self.request_headers()
def _report_data(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
replication_start_date = max(pendulum.parse(self._replication_start_date), pendulum.now("utc").subtract(days=90))
report_data = {

return {
"reportType": self.name,
"marketplaceIds": self.marketplace_ids,
"createdSince": replication_start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
"createdSince": replication_start_date.strftime(DATE_TIME_FORMAT),
}

def _create_report(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
request_headers = self.request_headers()
report_data = self._report_data(sync_mode, cursor_field, stream_slice, stream_state)
create_report_request = self._create_prepared_request(
http_method="POST",
path=f"{self.path_prefix}/reports",
Expand Down Expand Up @@ -254,7 +277,13 @@ def parse_response(self, response: requests.Response) -> Iterable[Mapping]:
document_records = csv.DictReader(StringIO(document), delimiter="\t")
yield from document_records

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
Create and retrieve the report.
Decrypt and parse the report is its fully proceed, then yield the report document records.
Expand All @@ -264,7 +293,7 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
is_done = False
start_time = pendulum.now("utc")
seconds_waited = 0
report_id = self._create_report()["reportId"]
report_id = self._create_report(sync_mode, cursor_field, stream_slice, stream_state)["reportId"]

# create and retrieve the report
while not is_processed and seconds_waited < REPORTS_MAX_WAIT_SECONDS:
Expand Down Expand Up @@ -341,6 +370,86 @@ class VendorInventoryHealthReports(ReportsAmazonSPStream):
name = "GET_VENDOR_INVENTORY_HEALTH_AND_PLANNING_REPORT"


class IncrementalReportsAmazonSPStream(ReportsAmazonSPStream):
@property
@abstractmethod
def cursor_field(self) -> Union[str, List[str]]:
pass

def _report_data(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
data = super()._report_data(sync_mode, cursor_field, stream_slice, stream_state)
if stream_slice:
data_times = {}
if stream_slice.get("dataStartTime"):
data_times["dataStartTime"] = stream_slice["dataStartTime"]
if stream_slice.get("dataEndTime"):
data_times["dataEndTime"] = stream_slice["dataEndTime"]
data.update(data_times)

return data

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}

def stream_slices(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:

start_date = pendulum.parse(self._replication_start_date)
end_date = pendulum.now()

if stream_state:
state = stream_state.get(self.cursor_field)
start_date = pendulum.parse(state)

start_date = min(start_date, end_date)
slices = []

while start_date < end_date:
end_date_slice = start_date.add(days=self.period_in_days)
slices.append({
"dataStartTime": start_date.strftime(DATE_TIME_FORMAT),
"dataEndTime": min(end_date_slice.subtract(seconds=1), end_date).strftime(DATE_TIME_FORMAT)
})
start_date = end_date_slice

return slices


class SellerFeedbackReports(IncrementalReportsAmazonSPStream):
"""
Field definitions: https://sellercentral.amazon.com/help/hub/reference/G202125660
"""

name = "GET_SELLER_FEEDBACK_DATA"
cursor_field = 'Date'
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization)

@transformer.registerCustomTransform
def transform_function(original_value: Any, field_schema: Dict[str, Any]) -> Any:
if original_value and "format" in field_schema and field_schema["format"] == "date":
transformed_value = pendulum.from_format(original_value, "M/D/YY").to_date_string()
return transformed_value

return original_value


class Orders(IncrementalAmazonSPStream):
"""
API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/orders-api/ordersV0.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def reports_stream():
replication_start_date="2017-01-25T00:00:00Z",
marketplace_ids=["id"],
authenticator=NoAuth(),
period_in_days=0
)
return stream

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/amazon-seller-partner.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This source is capable of syncing the following streams:
* [Inventory Health & Planning Report](https://github.com/amzn/selling-partner-api-docs/blob/main/references/reports-api/reporttype-values.md#vendor-retail-analytics-reports)
* [Orders](https://github.com/amzn/selling-partner-api-docs/blob/main/references/orders-api/ordersV0.md) \(incremental\)
* [VendorDirectFulfillmentShipping](https://github.com/amzn/selling-partner-api-docs/blob/main/references/vendor-direct-fulfillment-shipping-api/vendorDirectFulfillmentShippingV1.md)
* [Seller Feedback Report](https://github.com/amzn/selling-partner-api-docs/blob/main/references/reports-api/reporttype-values.md#performance-reports)

## Getting started

Expand Down Expand Up @@ -62,10 +63,10 @@ Information about rate limits you may find [here](https://github.com/amzn/sellin

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| `0.2.4` | 2021-11-08 | [\#8021](https://github.com/airbytehq/airbyte/pull/8021) | Added GET_SELLER_FEEDBACK_DATA report with incremental sync capability |
| `0.2.3` | 2021-11-08 | [\#7828](https://github.com/airbytehq/airbyte/pull/7828) | Remove datetime format from all streams |
| `0.2.2` | 2021-11-08 | [\#7752](https://github.com/airbytehq/airbyte/pull/7752) | Change `check_connection` function to use stream Orders |
| `0.2.1` | 2021-09-17 | [\#5248](https://github.com/airbytehq/airbyte/pull/5248) | `Added extra stream support. Updated reports streams logics` |
| `0.2.0` | 2021-08-06 | [\#4863](https://github.com/airbytehq/airbyte/pull/4863) | `Rebuild source with airbyte-cdk` |
| `0.1.3` | 2021-06-23 | [\#4288](https://github.com/airbytehq/airbyte/pull/4288) | `Bugfix failing connection check` |
| `0.1.2` | 2021-06-15 | [\#4108](https://github.com/airbytehq/airbyte/pull/4108) | `Fixed: Sync fails with timeout when create report is CANCELLED` |

0 comments on commit d7ef91f

Please sign in to comment.