Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update python cdk tutorial with updates to exchange rates api #12427

Merged
merged 3 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"access_key": {
"type": "string"
},
"base": {
"type": "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"type": "object",
"required": ["base", "date", "rates"],
"properties": {
"access_key": {
"type": "string"
},
"base": {
"type": "string"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{ "start_date": "2021-04-01", "base": "USD" }
{ "start_date": "2021-04-01", "base": "USD", "access_key": "abcdef" }
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"access_key": {
"type": "string"
},
"base": {
"type": "string"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{ "start_date": "2021-04-01", "base": "BTC" }
{ "start_date": "2021-04-01", "base": "BTC", "access_key": "abcdef" }
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"access_key": {
"type": "string"
},
"base": {
"type": "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@


class ExchangeRates(HttpStream):
url_base = "https://api.exchangeratesapi.io/"
url_base = "http://api.exchangeratesapi.io/"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exchange Rates only allows free tier to use http not https

cursor_field = "date"
primary_key = "date"

def __init__(self, base: str, start_date: datetime, **kwargs):
super().__init__(**kwargs)
self.base = base
def __init__(self, config: Mapping[str, Any], start_date: datetime, **kwargs):
super().__init__()
self.base = config["base"]
self.access_key = config["access_key"]
self.start_date = start_date
self._cursor_value = None

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# The API does not offer pagination, so we return None to indicate there are no more pages in the response
Expand All @@ -38,8 +40,8 @@ def request_params(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# The api requires that we include the base currency as a query param so we do that in this method
return {"base": self.base}
# The api requires that we include access_key as a query param so we do that in this method
return {"access_key": self.access_key}

def parse_response(
self,
Expand Down Expand Up @@ -104,4 +106,4 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = NoAuth()
# Parse the date from a string into a datetime object
start_date = datetime.strptime(config["start_date"], "%Y-%m-%d")
return [ExchangeRates(authenticator=auth, base=config["base"], start_date=start_date)]
return [ExchangeRates(authenticator=auth, config=config, start_date=start_date)]
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"required": ["start_date", "base"],
"additionalProperties": false,
"properties": {
"access_key": {
"title": "Access Key",
"type": "string",
"description": "API access key used to retrieve data from the Exchange Rates API."
},
"start_date": {
"title": "Start Date",
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This is a step-by-step guide for how to create an Airbyte source in Python to re

All the commands below assume that `python` points to a version of python >=3.9.0. On some systems, `python` points to a Python2 installation and `python3` points to Python3. If this is the case on your machine, substitute all `python` commands in this guide with `python3`.

## Exchange Rates API Setup

For this guide we will be making API calls to the Exchange Rates API. In order to generate the API access key that will be used by the new connector, you will have to follow steps on the [Exchange Rates API](https://exchangeratesapi.io/) by signing up for the Free tier plan. Once you have an API access key, you can continue with the guide.

## Checklist

* Step 1: Create the source using the template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ connectionSpecification:
title: Python Http Tutorial Spec
type: object
required:
- access_key
- start_date
- base
additionalProperties: false
properties:
access_key:
type: string
description: API access key used to retrieve data from the Exchange Rates API.
start_date:
type: string
description: Start getting data from that date.
Expand All @@ -37,6 +41,7 @@ connectionSpecification:

In addition to metadata, we define two inputs:

* `access_key`: The API access key used to authenticate requests to the API
* `start_date`: The beginning date to start tracking currency exchange rates from
* `base`: The currency whose rates we're interested in tracking

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ The second operation in the Airbyte Protocol that we'll implement is the `check`

This operation verifies that the input configuration supplied by the user can be used to connect to the underlying data source. Note that this user-supplied configuration has the values described in the `spec.yaml` filled in. In other words if the `spec.yaml` said that the source requires a `username` and `password` the config object might be `{ "username": "airbyte", "password": "password123" }`. You should then implement something that returns a json object reporting, given the credentials in the config, whether we were able to connect to the source.

In order to make requests the API, we need to specify the access
In our case, this is a fairly trivial check since the API requires no credentials. Instead, let's verify that the user-input `base` currency is a legitimate currency. In `source.py` we'll find the following autogenerated source:

```python
Expand Down Expand Up @@ -37,24 +38,22 @@ Following the docstring instructions, we'll change the implementation to verify
return True, None
```

Let's test out this implementation by creating two objects: a valid and an invalid config and attempt to give them as input to the connector
Let's test out this implementation by creating two objects: a valid and an invalid config and attempt to give them as input to the connector. For this section, you will need to take the API access key generated earlier and add it to both configs. Because these configs contain secrets, we recommend storing configs which contain secrets in `secrets/config.json` because the `secrets` directory is gitignored by default.

```text
echo '{"start_date": "2021-04-01", "base": "USD"}' > sample_files/config.json
echo '{"start_date": "2021-04-01", "base": "BTC"}' > sample_files/invalid_config.json
python main.py check --config sample_files/config.json
python main.py check --config sample_files/invalid_config.json
mkdir sample_files
echo '{"start_date": "2022-04-01", "base": "USD", "access_key": <your_access_key>}' > secrets/config.json
echo '{"start_date": "2022-04-01", "base": "BTC", "access_key": <your_access_key>}' > secrets/invalid_config.json
python main.py check --config secrets/config.json
python main.py check --config secrets/invalid_config.json
```

You should see output like the following:

```text
> python main.py check --config sample_files/config.json
> python main.py check --config secrets/config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

> python main.py check --config sample_files/invalid_config.json
> python main.py check --config secrets/invalid_config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "Input currency BTC is invalid. Please input one of the following currencies: {'DKK', 'USD', 'CZK', 'BGN', 'JPY'}"}}
```

While developing, we recommend storing configs which contain secrets in `secrets/config.json` because the `secrets` directory is gitignored by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ We'll begin by creating a stream to represent the data that we're pulling from t

```python
class ExchangeRates(HttpStream):
url_base = "https://api.exchangeratesapi.io/"
url_base = "http://api.exchangeratesapi.io/"

# Set this as a noop.
primary_key = None
Expand Down Expand Up @@ -60,7 +60,7 @@ Having created this stream in code, we'll put a file `exchange_rates.json` in th
With `.json` schema file in place, let's see if the connector can now find this schema and produce a valid catalog:

```text
python main.py discover --config sample_files/config.json
python main.py discover --config secrets/config.json
```

you should see some output like:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp

```python
class ExchangeRates(HttpStream):
url_base = "https://api.exchangeratesapi.io/"
url_base = "http://api.exchangeratesapi.io/"

primary_key = None

def __init__(self, base: str, **kwargs):
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__()
self.base = base
self.base = config['base']
self.access_key = config['access_key']


def path(
Expand All @@ -60,8 +61,8 @@ class ExchangeRates(HttpStream):
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# The api requires that we include the base currency as a query param so we do that in this method
return {'base': self.base}
# The api requires that we include access_key as a query param so we do that in this method
return {'access_key': self.access_key}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Free tier users no longer have the ability to specify a base. Leaving it in results in 400 errors. Although its confusing from a tutorial standpoint to allow for base, base is used in earlier sections to demonstrate invalid configs so I think we should not completely remove it from the tutorial since it still serves a purpose, even if not in the outbound req


def parse_response(
self,
Expand All @@ -80,28 +81,28 @@ class ExchangeRates(HttpStream):
return None
```

This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for. 2. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.
This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for and the `access_key` used for authentication. 2. `return {'access_key': self.access_key}` to add the `?access_key=<access-key-string>` query parameter to the request based on the `access_key` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.

Let's also pass the `base` parameter input by the user to the stream class:
Let's also pass the config specified by the user to the stream class:

```python
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = NoAuth()
return [ExchangeRates(authenticator=auth, base=config['base'])]
auth = NoAuth()
return [ExchangeRates(authenticator=auth, config=config)]
```

We're now ready to query the API!

To do this, we'll need a [ConfiguredCatalog](../../../understanding-airbyte/beginners-guide-to-catalog.md). We've prepared one [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json) -- download this and place it in `sample_files/configured_catalog.json`. Then run:

```text
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```

you should see some output lines, one of which is a record from the API:

```text
{"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"base": "USD", "rates": {"GBP": 0.7196938353, "HKD": 7.7597848573, "IDR": 14482.4824162185, "ILS": 3.2412081092, "DKK": 6.1532478279, "INR": 74.7852709971, "CHF": 0.915763343, "MXN": 19.8439387671, "CZK": 21.3545717832, "SGD": 1.3261894911, "THB": 31.4398014067, "HRK": 6.2599917253, "EUR": 0.8274720728, "MYR": 4.0979726934, "NOK": 8.3043442284, "CNY": 6.4856433595, "BGN": 1.61836988, "PHP": 48.3516756309, "PLN": 3.770872983, "ZAR": 14.2690111709, "CAD": 1.2436905254, "ISK": 124.9482829954, "BRL": 5.4526272238, "RON": 4.0738932561, "NZD": 1.3841125362, "TRY": 8.3101365329, "JPY": 108.0182043856, "RUB": 74.9555647497, "KRW": 1111.7583781547, "USD": 1.0, "AUD": 1.2840711626, "HUF": 300.6206040546, "SEK": 8.3829540753}, "date": "2021-04-26"}, "emitted_at": 1619498062000}}
"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"success": true, "timestamp": 1651129443, "base": "EUR", "date": "2022-04-28", "rates": {"AED": 3.86736, "AFN": 92.13195, "ALL": 120.627843, "AMD": 489.819318, "ANG": 1.910347, "AOA": 430.073735, "ARS": 121.119674, "AUD": 1.478877, "AWG": 1.895762, "AZN": 1.794932, "BAM": 1.953851, "BBD": 2.140212, "BDT": 91.662775, "BGN": 1.957013, "BHD": 0.396929, "BIF": 2176.669098, "BMD": 1.052909, "BND": 1.461004, "BOB": 7.298009, "BRL": 5.227798, "BSD": 1.060027, "BTC": 2.6717761e-05, "BTN": 81.165435, "BWP": 12.802036, "BYN": 3.565356, "BYR": 20637.011334, "BZD": 2.136616, "CAD": 1.349329, "CDF": 2118.452361, "CHF": 1.021627, "CLF": 0.032318, "CLP": 891.760584, "CNY": 6.953724, "COP": 4171.971894, "CRC": 701.446322, "CUC": 1.052909, "CUP": 27.902082, "CVE": 110.15345, "CZK": 24.499027, "DJF": 188.707108, "DKK": 7.441548, "DOP": 58.321493, "DZD": 152.371647, "EGP": 19.458297, "ERN": 15.793633, "ETB": 54.43729, "EUR": 1, "FJD": 2.274651, "FKP": 0.80931, "GBP": 0.839568, "GEL": 3.20611, "GGP": 0.80931, "GHS": 7.976422, "GIP": 0.80931, "GMD": 56.64554, "GNF": 9416.400803, "GTQ": 8.118402, "GYD": 221.765423, "HKD": 8.261854, "HNL": 26.0169, "HRK": 7.563467, "HTG": 115.545574, "HUF": 377.172734, "IDR": 15238.748216, "ILS": 3.489582, "IMP": 0.80931, "INR": 80.654494, "IQD": 1547.023976, "IRR": 44538.040218, "ISK": 137.457233, "JEP": 0.80931, "JMD": 163.910125, "JOD": 0.746498, "JPY": 137.331903, "KES": 121.87429, "KGS": 88.581418, "KHR": 4286.72178, "KMF": 486.443591, "KPW": 947.617993, "KRW": 1339.837191, "KWD": 0.322886, "KYD": 0.883397, "KZT": 473.770223, "LAK": 12761.755235, "LBP": 1602.661797, "LKR": 376.293562, "LRD": 159.989586, "LSL": 15.604181, "LTL": 3.108965, "LVL": 0.636894, "LYD": 5.031557, "MAD": 10.541225, "MDL": 19.593772, "MGA": 4284.002369, "MKD": 61.553251, "MMK": 1962.574442, "MNT": 3153.317641, "MOP": 8.567461, "MRO": 375.88824, "MUR": 45.165684, "MVR": 16.199478, "MWK": 865.62318, "MXN": 21.530268, "MYR": 4.594366, "MZN": 67.206888, "NAD": 15.604214, "NGN": 437.399752, "NIO": 37.965356, "NOK": 9.824365, "NPR": 129.86672, "NZD": 1.616441, "OMR": 0.405421, "PAB": 1.060027, "PEN": 4.054233, "PGK": 3.73593, "PHP": 55.075028, "PKR": 196.760944, "PLN": 4.698101, "PYG": 7246.992296, "QAR": 3.833603, "RON": 4.948144, "RSD": 117.620172, "RUB": 77.806269, "RWF": 1086.709833, "SAR": 3.949063, "SBD": 8.474149, "SCR": 14.304711, "SDG": 470.649944, "SEK": 10.367719, "SGD": 1.459695, "SHP": 1.45028, "SLL": 13082.391386, "SOS": 609.634325, "SRD": 21.904702, "STD": 21793.085136, "SVC": 9.275519, "SYP": 2645.380032, "SZL": 16.827859, "THB": 36.297991, "TJS": 13.196811, "TMT": 3.685181, "TND": 3.22348, "TOP": 2.428117, "TRY": 15.575532, "TTD": 7.202107, "TWD": 31.082183, "TZS": 2446.960099, "UAH": 32.065033, "UGX": 3773.578577, "USD": 1.052909, "UYU": 43.156886, "UZS": 11895.19696, "VEF": 225143710305.04727, "VND": 24171.62598, "VUV": 118.538204, "WST": 2.722234, "XAF": 655.287181, "XAG": 0.045404, "XAU": 0.000559, "XCD": 2.845538, "XDR": 0.783307, "XOF": 655.293398, "XPF": 118.347299, "YER": 263.490114, "ZAR": 16.77336, "ZMK": 9477.445964, "ZMW": 18.046154, "ZWL": 339.036185}}, "emitted_at": 1651130169364}}
```

There we have it - a stream which reads data in just a few lines of code!
Expand All @@ -127,10 +128,10 @@ Let's get the easy parts out of the way and pass the `start_date`:

```python
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = NoAuth()
# Parse the date from a string into a datetime object
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d')
return [ExchangeRates(authenticator=auth, base=config['base'], start_date=start_date)]
auth = NoAuth()
# Parse the date from a string into a datetime object
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d')
return [ExchangeRates(authenticator=auth, config=config, start_date=start_date)]
```

Let's also add this parameter to the constructor and declare the `cursor_field`:
Expand All @@ -141,18 +142,19 @@ from airbyte_cdk.sources.streams import IncrementalMixin


class ExchangeRates(HttpStream, IncrementalMixin):
url_base = "https://api.exchangeratesapi.io/"
url_base = "http://api.exchangeratesapi.io/"
cursor_field = "date"
primary_key = "date"

def __init__(self, base: str, start_date: datetime, **kwargs):
def __init__(self, config: Mapping[str, Any], start_date: datetime, **kwargs):
super().__init__()
self.base = base
self.base = config['base']
self.access_key = config['access_key']
self.start_date = start_date
self._cursor_value = None
```

Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config secrets/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.

But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th.

Expand Down Expand Up @@ -226,17 +228,17 @@ We should now have a working implementation of incremental sync!
Let's try it out:

```text
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```

You should see a bunch of `RECORD` messages and `STATE` messages. To verify that incremental sync is working, pass the input state back to the connector and run it again:

```text
# Save the latest state to sample_files/state.json
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json

# Run a read operation with the latest state message
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
```

You should see that only the record from the last date is being synced! This is acceptable behavior, since Airbyte requires at-least-once delivery of records, so repeating the last record twice is OK.
Expand Down