-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Metabase: migrate to Beta YAML #19236
Changes from 9 commits
3c0d4fa
2f90309
3a7c3d7
dd17096
c3a8bfb
76a1632
a66e9da
7fb84f0
c23b2ed
36d9a78
c4d90d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,37 @@ | ||
connector_image: airbyte/source-metabase:dev | ||
tests: | ||
acceptance_tests: | ||
spec: | ||
- spec_path: "source_metabase/spec.yaml" | ||
backward_compatibility_tests_config: | ||
disable_for_version: "0.1.0" | ||
tests: | ||
- spec_path: "source_metabase/spec.yaml" | ||
connection: | ||
- config_path: "secrets/config.json" | ||
status: "succeed" | ||
- config_path: "integration_tests/invalid_config.json" | ||
status: "failed" | ||
- config_path: "integration_tests/config_http_url.json" | ||
status: "failed" | ||
tests: | ||
- config_path: "secrets/config.json" | ||
status: "succeed" | ||
- config_path: "integration_tests/invalid_config.json" | ||
status: "failed" | ||
- config_path: "integration_tests/config_http_url.json" | ||
status: "failed" | ||
discovery: | ||
- config_path: "secrets/config.json" | ||
backward_compatibility_tests_config: | ||
disable_for_version: "0.1.0" | ||
tests: | ||
- config_path: "secrets/config.json" | ||
basic_read: | ||
- config_path: "secrets/config.json" | ||
configured_catalog_path: "integration_tests/configured_catalog.json" | ||
tests: | ||
- config_path: "secrets/config.json" | ||
configured_catalog_path: "integration_tests/configured_catalog.json" | ||
empty_streams: | ||
- name: activity | ||
bypass_reason: "data changes very fast" | ||
- name: cards | ||
bypass_reason: "data changes very fast" | ||
- name: collections | ||
bypass_reason: "data changes very fast" | ||
- name: dashboards | ||
bypass_reason: "data changes very fast" | ||
- name: users | ||
bypass_reason: "data changes very fast" | ||
full_refresh: | ||
tests: | ||
- config_path: "secrets/config.json" | ||
configured_catalog_path: "integration_tests/configured_catalog.json" | ||
incremental: | ||
bypass_reason: "This connector does not implement incremental sync" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
version: "0.3.0" | ||
|
||
definitions: | ||
selector: | ||
extractor: | ||
field_pointer: [ ] | ||
data_field_selector: | ||
type: RecordSelector | ||
extractor: | ||
type: DpathExtractor | ||
field_pointer: [ "data" ] | ||
requester: | ||
url_base: "{{ config['instance_api_url'] }}" | ||
http_method: "GET" | ||
authenticator: | ||
type: "SessionTokenAuthenticator" | ||
username: "{{ config['username'] }}" | ||
password: "{{ config['password'] }}" | ||
api_url: "{{ config['instance_api_url'] }}" | ||
header: "X-Metabase-Session" | ||
session_token: "{{ config['session_token'] }}" | ||
session_token_response_key: "id" | ||
login_url: "session" | ||
validate_session_url: "user/current" | ||
retriever: | ||
record_selector: | ||
$ref: "*ref(definitions.selector)" | ||
paginator: | ||
type: NoPagination | ||
requester: | ||
$ref: "*ref(definitions.requester)" | ||
data_field_retriever: | ||
record_selector: | ||
$ref: "*ref(definitions.data_field_selector)" | ||
paginator: | ||
type: NoPagination | ||
requester: | ||
$ref: "*ref(definitions.requester)" | ||
base_stream: | ||
primary_key: "id" | ||
retriever: | ||
$ref: "*ref(definitions.retriever)" | ||
activity_stream: | ||
$ref: "*ref(definitions.base_stream)" | ||
$options: | ||
name: "activity" | ||
path: "activity" | ||
cards_stream: | ||
$ref: "*ref(definitions.base_stream)" | ||
$options: | ||
name: "cards" | ||
path: "card" | ||
collections_stream: | ||
$ref: "*ref(definitions.base_stream)" | ||
$options: | ||
name: "collections" | ||
path: "collection" | ||
dashboards_stream: | ||
$ref: "*ref(definitions.base_stream)" | ||
$options: | ||
name: "dashboards" | ||
path: "dashboard" | ||
users_stream: | ||
primary_key: "id" | ||
retriever: | ||
$ref: "*ref(definitions.data_field_retriever)" | ||
$options: | ||
name: "users" | ||
path: "user" | ||
streams: | ||
- "*ref(definitions.activity_stream)" | ||
- "*ref(definitions.cards_stream)" | ||
- "*ref(definitions.collections_stream)" | ||
- "*ref(definitions.dashboards_stream)" | ||
- "*ref(definitions.users_stream)" | ||
|
||
check: | ||
stream_names: | ||
- "activity" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,132 +2,10 @@ | |
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import logging | ||
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple | ||
|
||
import requests | ||
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog | ||
from airbyte_cdk.sources import AbstractSource | ||
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator | ||
from source_metabase.streams import Activity, Cards, Collections, Dashboards, Users | ||
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource | ||
|
||
API_URL = "instance_api_url" | ||
USERNAME = "username" | ||
PASSWORD = "password" | ||
SESSION_TOKEN = "session_token" | ||
|
||
|
||
class MetabaseAuth(HttpAuthenticator): | ||
def __init__(self, logger: logging.Logger, config: Mapping[str, Any]): | ||
self.need_session_close = False | ||
self.session_token = "" | ||
self.logger = logger | ||
self.api_url = config[API_URL] | ||
if USERNAME in config and PASSWORD in config: | ||
self.username = config[USERNAME] | ||
self.password = config[PASSWORD] | ||
if SESSION_TOKEN in config: | ||
self.session_token = config[SESSION_TOKEN] | ||
elif USERNAME in config and PASSWORD in config: | ||
self.session_token = self.get_new_session_token(config[USERNAME], config[PASSWORD]) | ||
else: | ||
raise KeyError("Required parameters (username/password pair or session_token) not found") | ||
# TODO: Try to retrieve latest session_token stored in some state message? | ||
|
||
def get_new_session_token(self, username: str, password: str) -> str: | ||
response = requests.post( | ||
f"{self.api_url}session", headers={"Content-Type": "application/json"}, json={"username": username, "password": password} | ||
) | ||
response.raise_for_status() | ||
if response.ok: | ||
self.session_token = response.json()["id"] | ||
self.need_session_close = True | ||
self.logger.info(f"New session token generated for {username}") | ||
else: | ||
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}") | ||
return self.session_token | ||
|
||
def has_valid_token(self) -> bool: | ||
try: | ||
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header()) | ||
response.raise_for_status() | ||
except requests.exceptions.HTTPError as e: | ||
if e.response.status_code == 401: | ||
self.logger.warn(f"Unable to connect to Metabase source due to {str(e)}, retrying with a new session_token...") | ||
self.get_new_session_token(self.username, self.password) | ||
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header()) | ||
response.raise_for_status() | ||
else: | ||
raise ConnectionError(f"Error while checking connection: {e}") | ||
if response.ok: | ||
json_response = response.json() | ||
self.logger.info( | ||
f"Connection check for Metabase successful for {json_response['common_name']} login at {json_response['last_login']}" | ||
) | ||
return True | ||
else: | ||
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}") | ||
|
||
def get_auth_header(self) -> Mapping[str, Any]: | ||
return {"X-Metabase-Session": self.session_token} | ||
|
||
def close_session(self): | ||
if self.need_session_close: | ||
response = requests.delete( | ||
f"{self.api_url}session", headers=self.get_auth_header(), json={"metabase-session-id": self.session_token} | ||
) | ||
response.raise_for_status() | ||
if response.ok: | ||
self.logger.info("Session successfully closed") | ||
else: | ||
self.logger.info(f"Unable to close session {response.status_code}: {response.reason}") | ||
else: | ||
self.logger.info("Session was not opened by this connector.") | ||
|
||
|
||
class SourceMetabase(AbstractSource): | ||
class SourceMetabase(YamlDeclarativeSource): | ||
def __init__(self): | ||
self.session = None | ||
|
||
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | ||
session = None | ||
try: | ||
session = MetabaseAuth(logger, config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On one hand you are defining a check in yaml with a stream passed in, on the other hand you're overriding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, we need custom check because of updating session token, so I left check here, and removed from yaml. |
||
return session.has_valid_token(), None | ||
except Exception as e: | ||
return False, e | ||
finally: | ||
if session: | ||
session.close_session() | ||
|
||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
self.session = MetabaseAuth(logging.getLogger("airbyte"), config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
if not self.session.has_valid_token(): | ||
raise ConnectionError("Failed to connect to source") | ||
args = {"authenticator": self.session, API_URL: config[API_URL]} | ||
return [ | ||
Activity(**args), | ||
Cards(**args), | ||
Collections(**args), | ||
Dashboards(**args), | ||
Users(**args), | ||
] | ||
|
||
# We override the read method to make sure we close the metabase session and logout | ||
# so we don't keep too many active session_token active. | ||
def read( | ||
self, | ||
logger: logging.Logger, | ||
config: Mapping[str, Any], | ||
catalog: ConfiguredAirbyteCatalog, | ||
state: MutableMapping[str, Any] = None, | ||
) -> Iterator[AirbyteMessage]: | ||
try: | ||
yield from super().read(logger, config, catalog, state) | ||
finally: | ||
self.close_session() | ||
|
||
def close_session(self): | ||
if self.session: | ||
self.session.close_session() | ||
super().__init__(**{"path_to_yaml": "metabase.yaml"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this stream have a primary key? it's missing here