Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Metabase: migrate to Beta YAML #19236

Merged
merged 11 commits into from
Dec 13, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@
documentationUrl: https://docs.airbyte.com/integrations/sources/metabase
icon: metabase.svg
sourceType: api
releaseStage: alpha
releaseStage: beta
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-metabase/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-metabase
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
connector_image: airbyte/source-metabase:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_metabase/spec.yaml"
backward_compatibility_tests_config:
disable_for_version: "0.1.0"
tests:
- spec_path: "source_metabase/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/config_http_url.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/config_http_url.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.0"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams:
- name: activity
bypass_reason: "data changes very fast"
- name: cards
bypass_reason: "data changes very fast"
- name: collections
bypass_reason: "data changes very fast"
- name: dashboards
bypass_reason: "data changes very fast"
- name: users
bypass_reason: "data changes very fast"
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
bypass_reason: "This connector does not implement incremental sync"
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-metabase/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "requests>=2.28.0", "types-requests>=2.27.30"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "requests>=2.28.0", "types-requests>=2.27.30"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
version: "0.3.0"

definitions:
selector:
extractor:
field_pointer: [ ]
data_field_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_pointer: [ "data" ]
requester:
url_base: "{{ config['instance_api_url'] }}"
http_method: "GET"
authenticator:
type: "SessionTokenAuthenticator"
username: "{{ config['username'] }}"
password: "{{ config['password'] }}"
api_url: "{{ config['instance_api_url'] }}"
header: "X-Metabase-Session"
session_token: "{{ config['session_token'] }}"
session_token_response_key: "id"
login_url: "session"
validate_session_url: "user/current"
retriever:
record_selector:
$ref: "*ref(definitions.selector)"
paginator:
type: NoPagination
requester:
$ref: "*ref(definitions.requester)"
data_field_retriever:
record_selector:
$ref: "*ref(definitions.data_field_selector)"
paginator:
type: NoPagination
requester:
$ref: "*ref(definitions.requester)"
base_stream:
primary_key: "id"
retriever:
$ref: "*ref(definitions.retriever)"
activity_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "activity"
path: "activity"
cards_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "cards"
path: "card"
collections_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "collections"
path: "collection"
dashboards_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "dashboards"
path: "dashboard"
users_stream:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this stream have a primary key? it's missing here

primary_key: "id"
retriever:
$ref: "*ref(definitions.data_field_retriever)"
$options:
name: "users"
path: "user"
streams:
- "*ref(definitions.activity_stream)"
- "*ref(definitions.cards_stream)"
- "*ref(definitions.collections_stream)"
- "*ref(definitions.dashboards_stream)"
- "*ref(definitions.users_stream)"

check:
stream_names:
- "activity"
Original file line number Diff line number Diff line change
Expand Up @@ -2,132 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple

import requests
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from source_metabase.streams import Activity, Cards, Collections, Dashboards, Users
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

API_URL = "instance_api_url"
USERNAME = "username"
PASSWORD = "password"
SESSION_TOKEN = "session_token"


class MetabaseAuth(HttpAuthenticator):
def __init__(self, logger: logging.Logger, config: Mapping[str, Any]):
self.need_session_close = False
self.session_token = ""
self.logger = logger
self.api_url = config[API_URL]
if USERNAME in config and PASSWORD in config:
self.username = config[USERNAME]
self.password = config[PASSWORD]
if SESSION_TOKEN in config:
self.session_token = config[SESSION_TOKEN]
elif USERNAME in config and PASSWORD in config:
self.session_token = self.get_new_session_token(config[USERNAME], config[PASSWORD])
else:
raise KeyError("Required parameters (username/password pair or session_token) not found")
# TODO: Try to retrieve latest session_token stored in some state message?

def get_new_session_token(self, username: str, password: str) -> str:
response = requests.post(
f"{self.api_url}session", headers={"Content-Type": "application/json"}, json={"username": username, "password": password}
)
response.raise_for_status()
if response.ok:
self.session_token = response.json()["id"]
self.need_session_close = True
self.logger.info(f"New session token generated for {username}")
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
return self.session_token

def has_valid_token(self) -> bool:
try:
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
self.logger.warn(f"Unable to connect to Metabase source due to {str(e)}, retrying with a new session_token...")
self.get_new_session_token(self.username, self.password)
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header())
response.raise_for_status()
else:
raise ConnectionError(f"Error while checking connection: {e}")
if response.ok:
json_response = response.json()
self.logger.info(
f"Connection check for Metabase successful for {json_response['common_name']} login at {json_response['last_login']}"
)
return True
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")

def get_auth_header(self) -> Mapping[str, Any]:
return {"X-Metabase-Session": self.session_token}

def close_session(self):
if self.need_session_close:
response = requests.delete(
f"{self.api_url}session", headers=self.get_auth_header(), json={"metabase-session-id": self.session_token}
)
response.raise_for_status()
if response.ok:
self.logger.info("Session successfully closed")
else:
self.logger.info(f"Unable to close session {response.status_code}: {response.reason}")
else:
self.logger.info("Session was not opened by this connector.")


class SourceMetabase(AbstractSource):
class SourceMetabase(YamlDeclarativeSource):
def __init__(self):
self.session = None

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
session = None
try:
session = MetabaseAuth(logger, config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one hand you are defining a check in yaml with a stream passed in, on the other hand you're overriding a check method in python. This means yaml check will not work and has no sense. We have to leave just one of them, I'm in favour of yaml based check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, we need custom check because of updating session token, so I left check here, and removed from yaml.

return session.has_valid_token(), None
except Exception as e:
return False, e
finally:
if session:
session.close_session()

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self.session = MetabaseAuth(logging.getLogger("airbyte"), config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.close_session() will never execute since self.session is never set

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if not self.session.has_valid_token():
raise ConnectionError("Failed to connect to source")
args = {"authenticator": self.session, API_URL: config[API_URL]}
return [
Activity(**args),
Cards(**args),
Collections(**args),
Dashboards(**args),
Users(**args),
]

# We override the read method to make sure we close the metabase session and logout
# so we don't keep too many active session_token active.
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
try:
yield from super().read(logger, config, catalog, state)
finally:
self.close_session()

def close_session(self):
if self.session:
self.session.close_session()
super().__init__(**{"path_to_yaml": "metabase.yaml"})
Loading