Skip to content

Commit

Permalink
fixed working with session token
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Nov 18, 2022
1 parent 3a7c3d7 commit dd17096
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_records:
path: "integration_tests/expected_records.txt"
empty_streams:
- name: activity
bypass_reason: "data changes very fast"
- name: cards
bypass_reason: "data changes very fast"
- name: collections
bypass_reason: "data changes very fast"
- name: dashboards
bypass_reason: "data changes very fast"
- name: users
bypass_reason: "data changes very fast"
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-metabase/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "requests>=2.28.0", "types-requests>=2.27.30"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "requests>=2.28.0", "types-requests>=2.27.30", "cachetools"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,80 @@
#

import logging
from dataclasses import dataclass
from typing import Any, Mapping
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union

import requests
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from cachetools import TTLCache, cached
from dataclasses_jsonschema import JsonSchemaMixin

API_URL_CONFIG_KEY = "instance_api_url"
USERNAME_CONFIG_KEY = "username"
PASSWORD_CONFIG_KEY = "password"
SESSION_TOKEN_CONFIG_KEY = "session_token"
cacheMetabase = TTLCache(maxsize=1000, ttl=86400)


@cached(cacheMetabase)
def get_new_session_token(api_url: str, username: str, password: str) -> str:
response = requests.post(
f"{api_url}",
headers={"Content-Type": "application/json"},
json={"username": username, "password": password},
)
response.raise_for_status()
if not response.ok:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
return response.json()["id"]


@dataclass
class MetabaseAuth(HttpAuthenticator, JsonSchemaMixin):
def __init__(self, config: Mapping[str, Any], options: Mapping[str, Any]):
self.need_session_close = False
self.session_token = ""
class MetabaseSessionTokenAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
api_url: Union[InterpolatedString, str]
session_token: Union[InterpolatedString, str]
username: Union[InterpolatedString, str]
config: Config
options: InitVar[Mapping[str, Any]]
password: Union[InterpolatedString, str] = ""

def __post_init__(self, options):
self._username = InterpolatedString.create(self.username, options=options)
self._password = InterpolatedString.create(self.password, options=options)
self._api_url = InterpolatedString.create(self.api_url, options=options)
self._session_token = InterpolatedString.create(self.session_token, options=options)
self.logger = logging.getLogger("airbyte")
self.api_url = config[API_URL_CONFIG_KEY]
if USERNAME_CONFIG_KEY in config and PASSWORD_CONFIG_KEY in config:
self.username = config[USERNAME_CONFIG_KEY]
self.password = config[PASSWORD_CONFIG_KEY]
if SESSION_TOKEN_CONFIG_KEY in config:
self.session_token = config[SESSION_TOKEN_CONFIG_KEY]
elif USERNAME_CONFIG_KEY in config and PASSWORD_CONFIG_KEY in config:
self.session_token = self.get_new_session_token(config[USERNAME_CONFIG_KEY], config[PASSWORD_CONFIG_KEY])
else:
raise KeyError("Required parameters (username/password pair or session_token) not found")
# TODO: Try to retrieve latest session_token stored in some state message?

def get_new_session_token(self, username: str, password: str) -> str:
response = requests.post(
f"{self.api_url}session", headers={"Content-Type": "application/json"}, json={"username": username, "password": password}
)
response.raise_for_status()
if response.ok:
self.session_token = response.json()["id"]
self.need_session_close = True
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
return self.session_token
@property
def auth_header(self) -> str:
return "X-Metabase-Session"

@property
def token(self) -> str:
if not self._session_token.eval(self.config):
if self._password.eval(self.config) and self._username.eval(self.config):
username = self._username.eval(self.config)
password = self._password.eval(self.config)
api_url = f"{self._api_url.eval(self.config)}session"

return get_new_session_token(api_url, username, password)

def has_valid_token(self) -> bool:
if self.is_valid_session_token():
return self._session_token.eval(self.config)

raise ConnectionError("Invalid credentials: session token is not valid or provide username and password")

def is_valid_session_token(self) -> bool:
try:
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header())
response = requests.get(
f"{self._api_url.eval(self.config)}user/current", headers={self.auth_header: self._session_token.eval(self.config)}
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == requests.codes["unauthorized"]:
self.logger.warning(f"Unable to connect to Metabase source due to {str(e)}, retrying with a new session_token...")
self.get_new_session_token(self.username, self.password)
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header())
response.raise_for_status()
self.logger.warning(f"Unable to connect to Metabase source due to {str(e)}")
return False
else:
raise ConnectionError(f"Error while checking connection: {e}")
raise ConnectionError(f"Error while validating session token: {e}")
if response.ok:
json_response = response.json()
self.logger.info(
Expand All @@ -66,19 +85,3 @@ def has_valid_token(self) -> bool:
return True
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")

def get_auth_header(self) -> Mapping[str, Any]:
return {"X-Metabase-Session": self.session_token}

def close_session(self):
if self.need_session_close:
response = requests.delete(
f"{self.api_url}session", headers=self.get_auth_header(), json={"metabase-session-id": self.session_token}
)
response.raise_for_status()
if response.ok:
self.logger.info("Session successfully closed")
else:
self.logger.info(f"Unable to close session {response.status_code}: {response.reason}")
else:
self.logger.info("Session was not opened by this connector.")
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ definitions:
url_base: "{{ config['instance_api_url'] }}"
http_method: "GET"
authenticator:
class_name: source_metabase.components.MetabaseAuth
config: "{{ config }}"
class_name: source_metabase.components.MetabaseSessionTokenAuthenticator
username: "{{ config['username'] }}"
password: "{{ config['password'] }}"
api_url: "{{ config['instance_api_url'] }}"
session_token: "{{ config['session_token'] }}"
retriever:
record_selector:
$ref: "*ref(definitions.selector)"
Expand Down Expand Up @@ -67,4 +70,5 @@ streams:
- "*ref(definitions.users_stream)"

check:
stream_names: [ ]
stream_names:
- "activity"
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple

from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream

from .components import MetabaseAuth

API_URL = "instance_api_url"
USERNAME = "username"
PASSWORD = "password"
SESSION_TOKEN = "session_token"


class SourceMetabase(YamlDeclarativeSource):
def __init__(self):
self.session = None
super().__init__(**{"path_to_yaml": "metabase.yaml"})

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
session = None
try:
session = MetabaseAuth(config, {})
return session.has_valid_token(), None
except Exception as e:
return False, e
finally:
if session:
session.close_session()

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self.session = MetabaseAuth(config, {})

return super().streams(config)

# We override the read method to make sure we close the metabase session and logout
# so we don't keep too many active session_token active.
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
try:
yield from super().read(logger, config, catalog, state)
finally:
self.close_session()

def close_session(self):
if self.session:
self.session.close_session()
Loading

0 comments on commit dd17096

Please sign in to comment.