Skip to content

Commit

Permalink
🎉 Source Plaid: Fix 100 record limit and added start_date (#11104)
Browse files Browse the repository at this point in the history
* 🔧 Fixed the 100 transaction limit on Plaid source. Added start date to Plaid connector

* 📝 Updated integration docs

* ✏️ Updated import statement location
  • Loading branch information
ehmadzubair authored Apr 1, 2022
1 parent ea1a5a5 commit 3317552
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"access_token": "??",
"api_key": "??",
"client_id": "??",
"plaid_env": "sandbox"
"plaid_env": "sandbox",
"start_date": "2022-03-05"
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from plaid.api import plaid_api
from plaid.model.accounts_balance_get_request import AccountsBalanceGetRequest
from plaid.model.transactions_get_request import TransactionsGetRequest
from plaid.model.transactions_get_request_options import TransactionsGetRequestOptions

SPEC_ENV_TO_PLAID_ENV = {
"production": plaid.Environment.Production,
Expand All @@ -30,6 +31,7 @@ def __init__(self, config: Mapping[str, Any]):
api_client = plaid.ApiClient(plaid_config)
self.client = plaid_api.PlaidApi(api_client)
self.access_token = config["access_token"]
self.start_date = datetime.datetime.strptime(config.get("start_date"), "%Y-%m-%d").date() if config.get("start_date") else None


class BalanceStream(PlaidStream):
Expand Down Expand Up @@ -75,6 +77,14 @@ def cursor_field(self) -> Union[str, List[str]]:
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
return {"date": latest_record.get("date")}

def _get_transactions_response(self, start_date, end_date=datetime.datetime.utcnow().date(), offset=0):
options = TransactionsGetRequestOptions()
options.offset = offset

return self.client.transactions_get(
TransactionsGetRequest(access_token=self.access_token, start_date=start_date, end_date=end_date, options=options)
)

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -84,18 +94,27 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
date = stream_state.get("date")
all_transactions = []

if not date:
date = datetime.date.fromtimestamp(0)
else:
date = datetime.date.fromisoformat(date)
if date >= datetime.datetime.utcnow().date():
return

transaction_response = self.client.transactions_get(
TransactionsGetRequest(access_token=self.access_token, start_date=date, end_date=datetime.datetime.utcnow().date())
)
if self.start_date:
date = max(self.start_date, date)

response = self._get_transactions_response(date)
all_transactions.extend(response.transactions)
num_total_transactions = response.total_transactions

while len(all_transactions) < num_total_transactions:
response = self._get_transactions_response(date, offset=len(all_transactions))
all_transactions.extend(response.transactions)

yield from map(lambda x: x.to_dict(), sorted(transaction_response["transactions"], key=lambda t: t["date"]))
yield from map(lambda x: x.to_dict(), sorted(all_transactions, key=lambda t: t["date"]))


class SourcePlaid(AbstractSource):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
"type": "string",
"enum": ["sandbox", "development", "production"],
"description": "The Plaid environment"
},
"start_date": {
"title": "Start Date",
"type": "string",
"description": "The date from which you'd like to replicate data for Plaid in the format YYYY-MM-DD. All data generated after this date will be replicated.",
"examples": ["2021-03-01"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
}
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/plaid.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,4 @@ This guide will walk through how to create the credentials you need to run this
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.3.0 | 2022-01-05 | [7977](https://github.com/airbytehq/airbyte/pull/7977) | Migrate to Python CDK + add transaction stream |
| 0.3.1 | 2022-xx-xx | [7977](https://github.com/airbytehq/airbyte/pull/11104) | Fix 100 record limit and added start_date |

0 comments on commit 3317552

Please sign in to comment.