-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update python cdk tutorial with updates to exchange rates api #12427
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
{ "start_date": "2021-04-01", "base": "USD" } | ||
{ "start_date": "2021-04-01", "base": "USD", "access_key": "abcdef" } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
{ "start_date": "2021-04-01", "base": "BTC" } | ||
{ "start_date": "2021-04-01", "base": "BTC", "access_key": "abcdef" } |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,13 +36,14 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp | |
|
||
```python | ||
class ExchangeRates(HttpStream): | ||
url_base = "https://api.exchangeratesapi.io/" | ||
url_base = "http://api.exchangeratesapi.io/" | ||
|
||
primary_key = None | ||
|
||
def __init__(self, base: str, **kwargs): | ||
def __init__(self, config: Mapping[str, Any], **kwargs): | ||
super().__init__() | ||
self.base = base | ||
self.base = config['base'] | ||
self.access_key = config['access_key'] | ||
|
||
|
||
def path( | ||
|
@@ -60,8 +61,8 @@ class ExchangeRates(HttpStream): | |
stream_slice: Mapping[str, Any] = None, | ||
next_page_token: Mapping[str, Any] = None, | ||
) -> MutableMapping[str, Any]: | ||
# The api requires that we include the base currency as a query param so we do that in this method | ||
return {'base': self.base} | ||
# The api requires that we include access_key as a query param so we do that in this method | ||
return {'access_key': self.access_key} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Free tier users no longer have the ability to specify a base. Leaving it in results in 400 errors. Although its confusing from a tutorial standpoint to allow for base, base is used in earlier sections to demonstrate invalid configs so I think we should not completely remove it from the tutorial since it still serves a purpose, even if not in the outbound req |
||
|
||
def parse_response( | ||
self, | ||
|
@@ -80,28 +81,28 @@ class ExchangeRates(HttpStream): | |
return None | ||
``` | ||
|
||
This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for. 2. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data. | ||
This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for and the `access_key` used for authentication. 2. `return {'access_key': self.access_key}` to add the `?access_key=<access-key-string>` query parameter to the request based on the `access_key` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data. | ||
|
||
Let's also pass the `base` parameter input by the user to the stream class: | ||
Let's also pass the config specified by the user to the stream class: | ||
|
||
```python | ||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
auth = NoAuth() | ||
return [ExchangeRates(authenticator=auth, base=config['base'])] | ||
auth = NoAuth() | ||
return [ExchangeRates(authenticator=auth, config=config)] | ||
``` | ||
|
||
We're now ready to query the API! | ||
|
||
To do this, we'll need a [ConfiguredCatalog](../../../understanding-airbyte/beginners-guide-to-catalog.md). We've prepared one [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json) -- download this and place it in `sample_files/configured_catalog.json`. Then run: | ||
|
||
```text | ||
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | ||
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json | ||
``` | ||
|
||
you should see some output lines, one of which is a record from the API: | ||
|
||
```text | ||
{"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"base": "USD", "rates": {"GBP": 0.7196938353, "HKD": 7.7597848573, "IDR": 14482.4824162185, "ILS": 3.2412081092, "DKK": 6.1532478279, "INR": 74.7852709971, "CHF": 0.915763343, "MXN": 19.8439387671, "CZK": 21.3545717832, "SGD": 1.3261894911, "THB": 31.4398014067, "HRK": 6.2599917253, "EUR": 0.8274720728, "MYR": 4.0979726934, "NOK": 8.3043442284, "CNY": 6.4856433595, "BGN": 1.61836988, "PHP": 48.3516756309, "PLN": 3.770872983, "ZAR": 14.2690111709, "CAD": 1.2436905254, "ISK": 124.9482829954, "BRL": 5.4526272238, "RON": 4.0738932561, "NZD": 1.3841125362, "TRY": 8.3101365329, "JPY": 108.0182043856, "RUB": 74.9555647497, "KRW": 1111.7583781547, "USD": 1.0, "AUD": 1.2840711626, "HUF": 300.6206040546, "SEK": 8.3829540753}, "date": "2021-04-26"}, "emitted_at": 1619498062000}} | ||
"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"success": true, "timestamp": 1651129443, "base": "EUR", "date": "2022-04-28", "rates": {"AED": 3.86736, "AFN": 92.13195, "ALL": 120.627843, "AMD": 489.819318, "ANG": 1.910347, "AOA": 430.073735, "ARS": 121.119674, "AUD": 1.478877, "AWG": 1.895762, "AZN": 1.794932, "BAM": 1.953851, "BBD": 2.140212, "BDT": 91.662775, "BGN": 1.957013, "BHD": 0.396929, "BIF": 2176.669098, "BMD": 1.052909, "BND": 1.461004, "BOB": 7.298009, "BRL": 5.227798, "BSD": 1.060027, "BTC": 2.6717761e-05, "BTN": 81.165435, "BWP": 12.802036, "BYN": 3.565356, "BYR": 20637.011334, "BZD": 2.136616, "CAD": 1.349329, "CDF": 2118.452361, "CHF": 1.021627, "CLF": 0.032318, "CLP": 891.760584, "CNY": 6.953724, "COP": 4171.971894, "CRC": 701.446322, "CUC": 1.052909, "CUP": 27.902082, "CVE": 110.15345, "CZK": 24.499027, "DJF": 188.707108, "DKK": 7.441548, "DOP": 58.321493, "DZD": 152.371647, "EGP": 19.458297, "ERN": 15.793633, "ETB": 54.43729, "EUR": 1, "FJD": 2.274651, "FKP": 0.80931, "GBP": 0.839568, "GEL": 3.20611, "GGP": 0.80931, "GHS": 7.976422, "GIP": 0.80931, "GMD": 56.64554, "GNF": 9416.400803, "GTQ": 8.118402, "GYD": 221.765423, "HKD": 8.261854, "HNL": 26.0169, "HRK": 7.563467, "HTG": 115.545574, "HUF": 377.172734, "IDR": 15238.748216, "ILS": 3.489582, "IMP": 0.80931, "INR": 80.654494, "IQD": 1547.023976, "IRR": 44538.040218, "ISK": 137.457233, "JEP": 0.80931, "JMD": 163.910125, "JOD": 0.746498, "JPY": 137.331903, "KES": 121.87429, "KGS": 88.581418, "KHR": 4286.72178, "KMF": 486.443591, "KPW": 947.617993, "KRW": 1339.837191, "KWD": 0.322886, "KYD": 0.883397, "KZT": 473.770223, "LAK": 12761.755235, "LBP": 1602.661797, "LKR": 376.293562, "LRD": 159.989586, "LSL": 15.604181, "LTL": 3.108965, "LVL": 0.636894, "LYD": 5.031557, "MAD": 10.541225, "MDL": 19.593772, "MGA": 4284.002369, "MKD": 61.553251, "MMK": 1962.574442, "MNT": 3153.317641, "MOP": 8.567461, "MRO": 375.88824, "MUR": 45.165684, "MVR": 16.199478, "MWK": 865.62318, "MXN": 21.530268, "MYR": 4.594366, "MZN": 67.206888, "NAD": 15.604214, "NGN": 437.399752, "NIO": 37.965356, "NOK": 9.824365, "NPR": 129.86672, "NZD": 1.616441, "OMR": 0.405421, "PAB": 1.060027, "PEN": 4.054233, "PGK": 3.73593, "PHP": 55.075028, "PKR": 196.760944, "PLN": 4.698101, "PYG": 7246.992296, "QAR": 3.833603, "RON": 4.948144, "RSD": 117.620172, "RUB": 77.806269, "RWF": 1086.709833, "SAR": 3.949063, "SBD": 8.474149, "SCR": 14.304711, "SDG": 470.649944, "SEK": 10.367719, "SGD": 1.459695, "SHP": 1.45028, "SLL": 13082.391386, "SOS": 609.634325, "SRD": 21.904702, "STD": 21793.085136, "SVC": 9.275519, "SYP": 2645.380032, "SZL": 16.827859, "THB": 36.297991, "TJS": 13.196811, "TMT": 3.685181, "TND": 3.22348, "TOP": 2.428117, "TRY": 15.575532, "TTD": 7.202107, "TWD": 31.082183, "TZS": 2446.960099, "UAH": 32.065033, "UGX": 3773.578577, "USD": 1.052909, "UYU": 43.156886, "UZS": 11895.19696, "VEF": 225143710305.04727, "VND": 24171.62598, "VUV": 118.538204, "WST": 2.722234, "XAF": 655.287181, "XAG": 0.045404, "XAU": 0.000559, "XCD": 2.845538, "XDR": 0.783307, "XOF": 655.293398, "XPF": 118.347299, "YER": 263.490114, "ZAR": 16.77336, "ZMK": 9477.445964, "ZMW": 18.046154, "ZWL": 339.036185}}, "emitted_at": 1651130169364}} | ||
``` | ||
|
||
There we have it - a stream which reads data in just a few lines of code! | ||
|
@@ -127,10 +128,10 @@ Let's get the easy parts out of the way and pass the `start_date`: | |
|
||
```python | ||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
auth = NoAuth() | ||
# Parse the date from a string into a datetime object | ||
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d') | ||
return [ExchangeRates(authenticator=auth, base=config['base'], start_date=start_date)] | ||
auth = NoAuth() | ||
# Parse the date from a string into a datetime object | ||
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d') | ||
return [ExchangeRates(authenticator=auth, config=config, start_date=start_date)] | ||
``` | ||
|
||
Let's also add this parameter to the constructor and declare the `cursor_field`: | ||
|
@@ -141,18 +142,19 @@ from airbyte_cdk.sources.streams import IncrementalMixin | |
|
||
|
||
class ExchangeRates(HttpStream, IncrementalMixin): | ||
url_base = "https://api.exchangeratesapi.io/" | ||
url_base = "http://api.exchangeratesapi.io/" | ||
cursor_field = "date" | ||
primary_key = "date" | ||
|
||
def __init__(self, base: str, start_date: datetime, **kwargs): | ||
def __init__(self, config: Mapping[str, Any], start_date: datetime, **kwargs): | ||
super().__init__() | ||
self.base = base | ||
self.base = config['base'] | ||
self.access_key = config['access_key'] | ||
self.start_date = start_date | ||
self._cursor_value = None | ||
``` | ||
|
||
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`. | ||
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config secrets/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`. | ||
|
||
But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th. | ||
|
||
|
@@ -226,17 +228,17 @@ We should now have a working implementation of incremental sync! | |
Let's try it out: | ||
|
||
```text | ||
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | ||
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json | ||
``` | ||
|
||
You should see a bunch of `RECORD` messages and `STATE` messages. To verify that incremental sync is working, pass the input state back to the connector and run it again: | ||
|
||
```text | ||
# Save the latest state to sample_files/state.json | ||
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json | ||
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json | ||
|
||
# Run a read operation with the latest state message | ||
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json | ||
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json | ||
``` | ||
|
||
You should see that only the record from the last date is being synced! This is acceptable behavior, since Airbyte requires at-least-once delivery of records, so repeating the last record twice is OK. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exchange Rates only allows free tier to use http not https