From c5a56d4c9242c6a0c596f4ac983b009e90138c00 Mon Sep 17 00:00:00 2001 From: Vadym Ratniuk Date: Fri, 18 Feb 2022 20:52:26 +0200 Subject: [PATCH] moved common code to StripeSubStream --- .../source-stripe/source_stripe/streams.py | 205 ++++++++++-------- 1 file changed, 115 insertions(+), 90 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py index f26cea36de1d0..edd349b4b98ba 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py @@ -4,6 +4,7 @@ import math from abc import ABC, abstractmethod +from itertools import chain from typing import Any, Iterable, Mapping, MutableMapping, Optional import pendulum @@ -183,21 +184,14 @@ def path(self, **kwargs): return "events" -class Invoices(IncrementalStripeStream): - """ - API docs: https://stripe.com/docs/api/invoices/list +class StripeSubStream(StripeStream, ABC): """ + Research shows that records related to SubStream can be extracted from Parent streams which already + contain 1st page of needed items. Thus, it significantly decreases a number of requests needed to get + all item in parent stream, since parent stream returns 100 items per request. + Note, in major cases, pagination requests are not performed because sub items are fully reported in parent streams - cursor_field = "created" - - def path(self, **kwargs): - return "invoices" - - -class InvoiceLineItems(StripeStream): - """ - API docs: https://stripe.com/docs/api/invoices/invoice_lines - + For example: Line items are part of each 'invoice' record, so use Invoices stream because it allows bulk extraction: 0.1.28 and below - 1 request extracts line items for 1 invoice (+ pagination reqs) @@ -206,21 +200,59 @@ class InvoiceLineItems(StripeStream): if line items object has indication for next pages ('has_more' attr) then use current stream to extract next pages. In major cases pagination requests are not performed because line items are fully reported in 'invoice' record - """ - name = "invoice_line_items" + Example for InvoiceLineItems and parent Invoice streams, record from Invoice stream: + { + "created": 1641038947, <--- 'Invoice' record + "customer": "cus_HezytZRkaQJC8W", + "id": "in_1KD6OVIEn5WyEQxn9xuASHsD", <---- value for 'parent_id' attribute + "object": "invoice", + "total": 0, + ... + "lines": { <---- sub_items_attr + "data": [ + { + "id": "il_1KD6OVIEn5WyEQxnm5bzJzuA", <---- 'Invoice' line item record + "object": "line_item", + ... + }, + {...} + ], + "has_more": false, <---- next pages from 'InvoiceLineItemsPaginated' stream + "object": "list", + "total_count": 2, + "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines" + } + } + """ + + filter: Optional[Mapping[str, Any]] = None + add_parent_id: bool = False - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): - return f"invoices/{stream_slice['invoice_id']}/lines" + @property + @abstractmethod + def parent(self) -> StripeStream: + """ + :return: parent stream which contains needed records in + """ - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: + @property + @abstractmethod + def parent_id(self) -> str: + """ + :return: string with attribute name + """ - params = super().request_params(stream_state, stream_slice, next_page_token) + @property + @abstractmethod + def sub_items_attr(self) -> str: + """ + :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. + If the stream has no primary keys, return None. + """ + + def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs): + params = super().request_params(stream_slice=stream_slice, **kwargs) # add 'starting_after' param if not params.get("starting_after") and stream_slice and stream_slice.get("starting_after"): @@ -230,25 +262,57 @@ def request_params( def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: - invoices_stream = Invoices(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date) - for invoice in invoices_stream.read_records(sync_mode=SyncMode.full_refresh): + parent_stream = self.parent(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date) + for record in parent_stream.read_records(sync_mode=SyncMode.full_refresh): - lines_obj = invoice.get("lines", {}) - if not lines_obj: + items_obj = record.get(self.sub_items_attr, {}) + if not items_obj: continue - line_items = lines_obj.get("data", []) + items = items_obj.get("data", []) - # get the next pages with line items - line_items_next_pages = [] - if lines_obj.get("has_more") and line_items: - stream_slice = {"invoice_id": invoice["id"], "starting_after": line_items[-1]["id"]} - line_items_next_pages = super().read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, **kwargs) + # filter out 'bank_account' source items only + if self.filter: + items = [i for i in items if i.get(self.filter["attr"]) == self.filter["value"]] + + # get next pages + items_next_pages = [] + if items_obj.get("has_more") and items: + stream_slice = {self.parent_id: record["id"], "starting_after": items[-1]["id"]} + items_next_pages = super().read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, **kwargs) + + for item in chain(items, items_next_pages): + if self.add_parent_id: + # add reference to parent object when item doesn't have it already + item[self.parent_id] = record["id"] + yield item - # link invoice and relevant lines items by adding 'invoice_id' attr to each line_item record - for line_item in [*line_items, *line_items_next_pages]: - line_item["invoice_id"] = invoice["id"] - yield line_item + +class Invoices(IncrementalStripeStream): + """ + API docs: https://stripe.com/docs/api/invoices/list + """ + + cursor_field = "created" + + def path(self, **kwargs): + return "invoices" + + +class InvoiceLineItems(StripeSubStream): + """ + API docs: https://stripe.com/docs/api/invoices/invoice_lines + """ + + name = "invoice_line_items" + + parent = Invoices + parent_id: str = "invoice_id" + sub_items_attr = "lines" + add_parent_id = True + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): + return f"invoices/{stream_slice[self.parent_id]}/lines" class InvoiceItems(IncrementalStripeStream): @@ -314,44 +378,25 @@ def request_params(self, stream_state=None, **kwargs): return params -class SubscriptionItems(StripeStream): +class SubscriptionItems(StripeSubStream): """ API docs: https://stripe.com/docs/api/subscription_items/list """ name = "subscription_items" + parent: StripeStream = Subscriptions + parent_id: str = "subscription_id" + sub_items_attr: str = "items" + def path(self, **kwargs): return "subscription_items" def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs): params = super().request_params(stream_slice=stream_slice, **kwargs) - params["subscription"] = stream_slice["subscription_id"] - - # add 'starting_after' param - if not params.get("starting_after") and stream_slice and stream_slice.get("starting_after"): - params["starting_after"] = stream_slice["starting_after"] - + params["subscription"] = stream_slice[self.parent_id] return params - def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: - subscriptions_stream = Subscriptions(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date) - for subscription in subscriptions_stream.read_records(sync_mode=SyncMode.full_refresh): - - items_obj = subscription.get("items", {}) - if not items_obj: - continue - - items = items_obj.get("data", []) - - # get the next pages with subscription items - items_next_pages = [] - if items_obj.get("has_more") and items: - stream_slice = {"subscription_id": subscription["id"], "starting_after": items[-1]["id"]} - items_next_pages = super().read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, **kwargs) - - yield from [*items, *items_next_pages] - class Transfers(IncrementalStripeStream): """ @@ -386,46 +431,26 @@ def path(self, **kwargs): return "payment_intents" -class BankAccounts(StripeStream): +class BankAccounts(StripeSubStream): """ API docs: https://stripe.com/docs/api/customer_bank_accounts/list """ name = "bank_accounts" + parent = Customers + parent_id = "customer_id" + sub_items_attr = "sources" + filter = {"attr": "object", "value": "bank_account"} + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): - customer_id = stream_slice["customer_id"] - return f"customers/{customer_id}/sources" + return f"customers/{stream_slice[self.parent_id]}/sources" def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) params["object"] = "bank_account" - - # add 'starting_after' param - if not params.get("starting_after") and stream_slice and stream_slice.get("starting_after"): - params["starting_after"] = stream_slice["starting_after"] - return params - def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: - customers_stream = Customers(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date) - for customer in customers_stream.read_records(sync_mode=SyncMode.full_refresh): - - sources_obj = customer.get("sources", {}) - if not sources_obj: - continue - - # filter out 'bank_account' source items only - bank_accounts = [item for item in sources_obj.get("data", []) if item.get("object") == "bank_account"] - - # get the next pages with subscription items - bank_accounts_next_pages = [] - if sources_obj.get("has_more") and bank_accounts: - stream_slice = {"customer_id": customer["id"], "starting_after": bank_accounts[-1]["id"]} - bank_accounts_next_pages = super().read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, **kwargs) - - yield from [*bank_accounts, *bank_accounts_next_pages] - class CheckoutSessions(StripeStream): """