diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 447ad96549d17..4ea5174d85083 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -363,7 +363,7 @@ - name: Google Analytics Data API sourceDefinitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a dockerRepository: airbyte/source-google-analytics-data-api - dockerImageTag: 0.0.2 + dockerImageTag: 0.0.3 documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-v4 icon: google-analytics.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index aadda6d4a595c..bed8a03cb722d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3191,7 +3191,7 @@ oauthFlowOutputParameters: - - "access_token" - - "refresh_token" -- dockerImage: "airbyte/source-google-analytics-data-api:0.0.2" +- dockerImage: "airbyte/source-google-analytics-data-api:0.0.3" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/google-analytics-v4" connectionSpecification: @@ -3200,12 +3200,8 @@ type: "object" required: - "property_id" - - "json_credentials" - - "report_name" - - "dimensions" - - "metrics" - "date_ranges_start_date" - - "date_ranges_end_date" + additionalProperties: true properties: property_id: type: "string" @@ -3213,42 +3209,115 @@ description: "A Google Analytics GA4 property identifier whose events are\ \ tracked. Specified in the URL path and not the body" order: 1 - json_credentials: - type: "string" - title: "JSON Credentials" - description: "The JSON key of the Service Account to use for authorization" - airbyte_secret: true - order: 2 - report_name: - type: "string" - title: "Report Name" - description: "The report name" - order: 3 - dimensions: - type: "string" - title: "Dimensions" - description: "Comma seprated report dimensions https://developers.google.com/analytics/devguides/reporting/data/v1/api-schema#dimensions" - order: 4 - metrics: - type: "string" - title: "Metrics" - description: "Comma seprated report metrics https://developers.google.com/analytics/devguides/reporting/data/v1/api-schema#metrics" - order: 5 + credentials: + order: 0 + type: "object" + title: "Credentials" + description: "Credentials for the service" + oneOf: + - title: "Authenticate via Google (Oauth)" + type: "object" + required: + - "client_id" + - "client_secret" + - "refresh_token" + properties: + auth_type: + type: "string" + const: "Client" + order: 0 + client_id: + title: "Client ID" + type: "string" + description: "The Client ID of your Google Analytics developer application." + airbyte_secret: true + order: 1 + client_secret: + title: "Client Secret" + type: "string" + description: "The Client Secret of your Google Analytics developer\ + \ application." + airbyte_secret: true + order: 2 + refresh_token: + title: "Refresh Token" + type: "string" + description: "The token for obtaining a new access token." + airbyte_secret: true + order: 3 + access_token: + title: "Access Token (Optional)" + type: "string" + description: "Access Token for making authenticated requests." + airbyte_secret: true + order: 4 + - type: "object" + title: "Service Account Key Authentication" + required: + - "credentials_json" + properties: + auth_type: + type: "string" + const: "Service" + order: 0 + credentials_json: + title: "Service Account JSON Key" + type: "string" + description: "The JSON key of the service account to use for authorization" + examples: + - "{ \"type\": \"service_account\", \"project_id\": YOUR_PROJECT_ID,\ + \ \"private_key_id\": YOUR_PRIVATE_KEY, ... }" + airbyte_secret: true date_ranges_start_date: type: "string" title: "Date Range Start Date" description: "The start date. One of the values Ndaysago, yesterday, today\ \ or in the format YYYY-MM-DD" - order: 6 - date_ranges_end_date: + order: 2 + custom_reports: + order: 3 type: "string" - title: "Date Range End Date" - description: "The end date. One of the values Ndaysago, yesterday, today\ - \ or in the format YYYY-MM-DD" - order: 7 + title: "Custom Reports (Optional)" + description: "A JSON array describing the custom reports you want to sync\ + \ from Google Analytics. See the docs for more information about the exact format you can use\ + \ to fill out this field." + window_in_days: + type: "integer" + title: "Data request time increment in days (Optional)" + description: "The time increment used by the connector when requesting data\ + \ from the Google Analytics API. More information is available in the\ + \ the docs. The bigger this value is, the faster the sync will be,\ + \ but the more likely that sampling will be applied to your data, potentially\ + \ causing inaccuracies in the returned results. We recommend setting this\ + \ to 1 unless you have a hard requirement to make the sync faster at the\ + \ expense of accuracy. The minimum allowed value for this field is 1,\ + \ and the maximum is 364. " + examples: + - 30 + - 60 + - 90 + - 120 + - 200 + - 364 + default: 1 + order: 4 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] + authSpecification: + auth_type: "oauth2.0" + oauth2Specification: + rootObject: + - "credentials" + - "0" + oauthFlowInitParameters: + - - "client_id" + - - "client_secret" + oauthFlowOutputParameters: + - - "access_token" + - - "refresh_token" - dockerImage: "airbyte/source-google-directory:0.1.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/google-directory" diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile index cbcc7f9db9512..e3e6b791c953c 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.0.2 +LABEL io.airbyte.version=0.0.3 LABEL io.airbyte.name=airbyte/source-google-analytics-data-api diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/Makefile b/airbyte-integrations/connectors/source-google-analytics-data-api/Makefile new file mode 100644 index 0000000000000..23ef26f46b24f --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/Makefile @@ -0,0 +1,19 @@ +docker_image := airbyte/$(notdir $(CURDIR)):dev + +run-build: + docker build . -t ${docker_image} + +spec: + @docker run --rm $(docker_image) spec | jq + +check: + @docker run --rm -v $(PWD)/secrets:/secrets $(docker_image) check --config /secrets/config.json | jq + +discover: + @docker run --rm -v $(PWD)/secrets:/secrets $(docker_image) discover --config /secrets/config.json | jq + +read: + @docker run --rm -v $(PWD)/secrets:/secrets -v $(PWD)/integration_tests:/integration_tests $(docker_image) read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json | jq + +unittest-local: + @python -m pytest unit_tests \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml index c08884b79567a..04482497e8a96 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml @@ -11,6 +11,8 @@ tests: status: "failed" discovery: - config_path: "secrets/config.json" + backward_compatibility_tests_config: + disable_for_version: "0.0.2" basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" @@ -18,3 +20,12 @@ tests: full_refresh: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" + ignored_fields: + "daily_active_users": ["uuid"] + "weekly_active_users": ["uuid"] + "four_weekly_active_users": ["uuid"] + "devices": ["uuid"] + "locations": ["uuid"] + "pages": ["uuid"] + "traffic_sources": ["uuid"] + "website_overview": ["uuid"] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json index b6849522598bd..0b74613f76d31 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json @@ -2,13 +2,90 @@ "streams": [ { "stream": { - "name": "Analytics Report", + "name": "daily_active_users", "json_schema": {}, - "supported_sync_modes": ["full_refresh"], + "supported_sync_modes": ["incremental"], "source_defined_cursor": false, - "default_cursor_field": ["column_name"] + "default_cursor_field": ["date"] }, - "sync_mode": "full_refresh", + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "weekly_active_users", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "four_weekly_active_users", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "devices", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "locations", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "pages", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "traffic_sources", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "website_overview", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", "destination_sync_mode": "overwrite" } ] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py b/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py index 60b3728cf709e..87ff851e1c54d 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-analytics-data==0.11.2"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-analytics-data==0.11.2", "PyJWT==2.4.0", "cryptography==37.0.4", "requests==2.28.1"] TEST_REQUIREMENTS = [ "pytest~=6.1", diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/authenticator.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/authenticator.py new file mode 100644 index 0000000000000..5dc6858bd0e6d --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/authenticator.py @@ -0,0 +1,71 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime + +import jwt +import requests +from source_google_analytics_data_api import utils + + +class GoogleServiceKeyAuthenticator(requests.auth.AuthBase): + _google_oauth2_token_endpoint = "https://oauth2.googleapis.com/token" + _google_oauth2_scope_endpoint = "https://www.googleapis.com/auth/analytics.readonly" + _google_oauth2_grant_type_urn = "urn:ietf:params:oauth:grant-type:jwt-bearer" + + _default_token_lifetime_secs = 3600 + _jwt_encode_algorithm = "RS256" + + def __init__(self, credentials: dict): + self._client_email = credentials["client_email"] + self._client_secret = credentials["private_key"] + self._client_id = credentials["client_id"] + + self._token: dict = {} + + def _get_claims(self) -> dict: + now = datetime.datetime.utcnow() + expiry = now + datetime.timedelta(seconds=self._default_token_lifetime_secs) + + return { + "iss": self._client_email, + "scope": self._google_oauth2_scope_endpoint, + "aud": self._google_oauth2_token_endpoint, + "exp": utils.datetime_to_secs(expiry), + "iat": utils.datetime_to_secs(now), + } + + def _get_headers(self): + headers = {} + if self._client_id: + headers["kid"] = self._client_id + return headers + + def _get_signed_payload(self) -> dict: + claims = self._get_claims() + headers = self._get_headers() + assertion = jwt.encode(claims, self._client_secret, headers=headers, algorithm=self._jwt_encode_algorithm) + return {"grant_type": self._google_oauth2_grant_type_urn, "assertion": str(assertion)} + + def _token_expired(self): + if not self._token: + return True + return self._token["expires_at"] < utils.datetime_to_secs(datetime.datetime.utcnow()) + + def _rotate(self): + if self._token_expired(): + try: + response = requests.request(method="POST", url=self._google_oauth2_token_endpoint, params=self._get_signed_payload()).json() + except requests.exceptions.RequestException as e: + raise Exception(f"Error refreshing access token: {e}") from e + self._token = dict( + **response, + expires_at=utils.datetime_to_secs(datetime.datetime.utcnow() + datetime.timedelta(seconds=response["expires_in"])), + ) + + def __call__(self, r: requests.Request) -> requests.Request: + self._rotate() + + r.headers["Authorization"] = f"Bearer {self._token['access_token']}" + return r diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/client.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/client.py deleted file mode 100644 index 3554b72a0b4a8..0000000000000 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/client.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from collections import Mapping -from typing import Any, Dict, List - -from google.analytics.data_v1beta import BetaAnalyticsDataClient, DateRange, Dimension, Metric, OrderBy, RunReportRequest, RunReportResponse -from google.oauth2 import service_account - -DEFAULT_CURSOR_FIELD = "date" - - -class Client: - def __init__(self, json_credentials: Mapping[str, str]): - self.json_credentials = json_credentials - - def run_report(self, property_id: str, dimensions: List[str], metrics: List[str], start_date: str, end_date: str) -> RunReportResponse: - dimensions = [Dimension(name=dim) for dim in dimensions if dim != DEFAULT_CURSOR_FIELD] - dimensions.append(Dimension(name=DEFAULT_CURSOR_FIELD)) - - metrics = [Metric(name=metric) for metric in metrics] - - credentials = service_account.Credentials.from_service_account_info(self.json_credentials) - client = BetaAnalyticsDataClient(credentials=credentials) - - request = RunReportRequest( - property=f"properties/{property_id}", - dimensions=dimensions, - metrics=metrics, - date_ranges=[DateRange(start_date=start_date, end_date=end_date)], - order_bys=[ - OrderBy( - dimension=OrderBy.DimensionOrderBy( - dimension_name=DEFAULT_CURSOR_FIELD, order_type=OrderBy.DimensionOrderBy.OrderType.ALPHANUMERIC - ) - ) - ], - ) - - return client.run_report(request) - - @staticmethod - def response_to_list(response: RunReportResponse) -> List[Dict[str, Any]]: - """ - Returns the report response as a list of dictionaries - - :param response: The run report response - - :return: A list of dictionaries, the key is either dimension name or metric name and the value is the dimension or the metric value - """ - dimensions = list(map(lambda h: h.name, response.dimension_headers)) - metrics = list(map(lambda h: h.name, response.metric_headers)) - - rows = [] - - for row in response.rows: - data = dict(zip(dimensions, list(map(lambda v: v.value, row.dimension_values)))) - data.update(dict(zip(metrics, list(map(lambda v: float(v.value), row.metric_values))))) - rows.append(data) - - return rows diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/defaults/default_reports.json b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/defaults/default_reports.json new file mode 100644 index 0000000000000..6300a5dde4b0d --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/defaults/default_reports.json @@ -0,0 +1,91 @@ +[ + { + "name": "daily_active_users", + "dimensions": ["date"], + "metrics": ["active1DayUsers"] + }, + { + "name": "weekly_active_users", + "dimensions": ["date"], + "metrics": ["active7DayUsers"] + }, + { + "name": "four_weekly_active_users", + "dimensions": ["date"], + "metrics": ["active28DayUsers"] + }, + { + "name": "devices", + "dimensions": [ + "date", + "deviceCategory", + "operatingSystem", + "browser" + ], + "metrics": [ + "totalUsers", + "newUsers", + "sessions", + "sessionsPerUser", + "averageSessionDuration", + "screenPageViews", + "screenPageViewsPerSession", + "bounceRate" + ] + }, + { + "name": "locations", + "dimensions": [ + "region", + "country", + "city", + "date" + ], + "metrics": [ + "totalUsers", + "newUsers", + "sessions", + "sessionsPerUser", + "averageSessionDuration", + "screenPageViews", + "screenPageViewsPerSession", + "bounceRate" + ] + }, + { + "name": "pages", + "dimensions": ["date", "hostName", "pagePathPlusQueryString"], + "metrics": [ + "screenPageViews", + "bounceRate" + ] + }, + { + "name": "traffic_sources", + "dimensions": ["date", "sessionSource", "sessionMedium"], + "metrics": [ + "totalUsers", + "newUsers", + "sessions", + "sessionsPerUser", + "averageSessionDuration", + "screenPageViews", + "screenPageViewsPerSession", + "bounceRate" + ] + }, + { + "name": "website_overview", + "dimensions": ["date"], + "metrics": [ + "totalUsers", + "newUsers", + "sessions", + "sessionsPerUser", + "averageSessionDuration", + "screenPageViews", + "screenPageViewsPerSession", + "bounceRate" + ] + } +] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py index 81bb4ebe6e75e..430697749452e 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py @@ -2,143 +2,374 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import collections +import datetime import json import logging -from datetime import datetime -from typing import Any, Generator, Mapping, MutableMapping - -from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import ( - AirbyteCatalog, - AirbyteConnectionStatus, - AirbyteMessage, - AirbyteRecordMessage, - AirbyteStateMessage, - AirbyteStream, - ConfiguredAirbyteCatalog, - Status, - SyncMode, - Type, -) -from airbyte_cdk.sources import Source -from google.analytics.data_v1beta import RunReportResponse -from source_google_analytics_data_api.client import DEFAULT_CURSOR_FIELD, Client - - -class SourceGoogleAnalyticsDataApi(Source): - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - """ - Tests if the input configuration can be used to successfully connect to the integration - e.g: if a provided Stripe API token can be used to connect to the Stripe API. +import pkgutil +import uuid +from abc import ABC +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union - :param logger: Logging object to display debug/info/error to the logs - (logs will not be accessible via airbyte UI if they are not passed to this logger) - :param config: Json object containing the configuration of this source, content of this json is as specified in - the properties of the spec.json/spec.yaml file +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import IncrementalMixin, Stream +from airbyte_cdk.sources.streams.http import HttpStream, auth +from source_google_analytics_data_api import utils +from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator - :return: AirbyteConnectionStatus indicating a Success or Failure - """ - try: - self._run_report(config) +metrics_data_types_map: Dict = { + "METRIC_TYPE_UNSPECIFIED": "string", + "TYPE_INTEGER": "integer", + "TYPE_FLOAT": "number", + "TYPE_SECONDS": "number", + "TYPE_MILLISECONDS": "number", + "TYPE_MINUTES": "number", + "TYPE_HOURS": "number", + "TYPE_STANDARD": "number", + "TYPE_CURRENCY": "number", + "TYPE_FEET": "number", + "TYPE_MILES": "number", + "TYPE_METERS": "number", + "TYPE_KILOMETERS": "number", +} - return AirbyteConnectionStatus(status=Status.SUCCEEDED) - except Exception as e: - return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}") - def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog: - """ - Returns an AirbyteCatalog representing the available streams and fields in this integration. - For example, given valid credentials to a Postgres database, - returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. - - :param logger: Logging object to display debug/info/error to the logs - (logs will not be accessible via airbyte UI if they are not passed to this logger) - :param config: Json object containing the configuration of this source, content of this json is as specified in - the properties of the spec.json/spec.yaml file - - :return: AirbyteCatalog is an object describing a list of all available streams in this source. - A stream is an AirbyteStream object that includes: - - its stream name (or table name in the case of Postgres) - - json_schema providing the specifications of expected schema for this stream (a list of columns described - by their names and types) - """ - report_name = config.get("report_name") +def get_metrics_type(t: str) -> str: + return metrics_data_types_map.get(t, "number") + + +metrics_data_native_types_map: Dict = { + "METRIC_TYPE_UNSPECIFIED": str, + "TYPE_INTEGER": int, + "TYPE_FLOAT": float, + "TYPE_SECONDS": float, + "TYPE_MILLISECONDS": float, + "TYPE_MINUTES": float, + "TYPE_HOURS": float, + "TYPE_STANDARD": float, + "TYPE_CURRENCY": float, + "TYPE_FEET": float, + "TYPE_MILES": float, + "TYPE_METERS": float, + "TYPE_KILOMETERS": float, +} + + +def metrics_type_to_python(t: str) -> type: + return metrics_data_native_types_map.get(t, str) + + +def get_dimensions_type(d: str) -> str: + return "string" + + +authenticator_class_map: Dict = { + "Service": (GoogleServiceKeyAuthenticator, lambda credentials: {"credentials": json.loads(credentials["credentials_json"])}), + "Client": ( + auth.Oauth2Authenticator, + lambda credentials: { + "token_refresh_endpoint": "https://oauth2.googleapis.com/token", + "scopes": ["https://www.googleapis.com/auth/analytics.readonly"], + "client_secret": credentials["client_secret"], + "client_id": credentials["client_id"], + "refresh_token": credentials["refresh_token"], + }, + ), +} + + +def get_authenticator(credentials): + try: + authenticator_class, get_credentials = authenticator_class_map[credentials["auth_type"]] + except KeyError as e: + raise e + return authenticator_class(**get_credentials(credentials)) + + +class MetadataDescriptor: + def __init__(self): + self._metadata = None + + def __get__(self, instance, owner): + if not self._metadata: + authenticator = ( + instance.authenticator + if not isinstance(instance.authenticator, auth.NoAuth) + else get_authenticator(instance.config["credentials"]) + ) + stream = GoogleAnalyticsDataApiTestConnectionStream(config=instance.config, authenticator=authenticator) + try: + metadata = next(iter(stream.read_records(sync_mode=SyncMode.full_refresh))) + except Exception as e: + raise e - response = self._run_report(config) + self._metadata = { + "dimensions": {m["apiName"]: m for m in metadata["dimensions"]}, + "metrics": {m["apiName"]: m for m in metadata["metrics"]}, + } - properties = {DEFAULT_CURSOR_FIELD: {"type": "string"}} + return self._metadata - for dimension in response.dimension_headers: - properties[dimension.name] = {"type": "string"} - for metric in response.metric_headers: - properties[metric.name] = {"type": "number"} +class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC): + url_base = "https://analyticsdata.googleapis.com/v1beta/" + http_method = "POST" - json_schema = { + def __init__(self, config: Mapping[str, Any], *args, **kwargs): + super().__init__(*args, **kwargs) + self._config = config + + @property + def config(self): + return self._config + + +class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream): + row_limit = 100000 + + metadata = MetadataDescriptor() + + @property + def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: + return "uuid" + + @staticmethod + def add_primary_key() -> dict: + return {"uuid": str(uuid.uuid4())} + + @staticmethod + def add_property_id(property_id): + return {"property_id": property_id} + + @staticmethod + def add_dimensions(dimensions, row) -> dict: + return dict(zip(dimensions, [v["value"] for v in row["dimensionValues"]])) + + @staticmethod + def add_metrics(metrics, metric_types, row) -> dict: + def _metric_type_to_python(metric_data: Tuple[str, str]) -> Any: + metric_name, metric_value = metric_data + python_type = metrics_type_to_python(metric_types[metric_name]) + return metric_name, python_type(metric_value) + + return dict(map(_metric_type_to_python, zip(metrics, [v["value"] for v in row["metricValues"]]))) + + def get_json_schema(self) -> Mapping[str, Any]: + """ + Override get_json_schema CDK method to retrieve the schema information for GoogleAnalyticsV4 Object dynamically. + """ + schema: Dict[str, Any] = { "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": properties, + "type": ["null", "object"], + "additionalProperties": True, + "properties": { + "property_id": {"type": ["string"]}, + "uuid": {"type": ["string"], "description": "Custom unique identifier for each record, to support primary key"}, + }, } - primary_key = list(map(lambda h: [h.name], response.dimension_headers)) + schema["properties"].update( + { + d: {"type": get_dimensions_type(d), "description": self.metadata["dimensions"].get(d, {}).get("description", d)} + for d in self.config["dimensions"] + } + ) - stream = AirbyteStream( - name=report_name, - json_schema=json_schema, - supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], - source_defined_primary_key=primary_key, - default_cursor_field=[DEFAULT_CURSOR_FIELD], + schema["properties"].update( + { + m: { + "type": ["null", get_metrics_type(self.metadata["metrics"].get(m, {}).get("type"))], + "description": self.metadata["metrics"].get(m, {}).get("description", m), + } + for m in self.config["metrics"] + } ) - return AirbyteCatalog(streams=[stream]) - def read( - self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None - ) -> Generator[AirbyteMessage, None, None]: - """ - Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, - catalog, and state. - - :param logger: Logging object to display debug/info/error to the logs - (logs will not be accessible via airbyte UI if they are not passed to this logger) - :param config: Json object containing the configuration of this source, content of this json is as specified in - the properties of the spec.json/spec.yaml file - :param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog - returned by discover(), but - in addition, it's been configured in the UI! For each particular stream and field, there may have been provided - with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc - :param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume - replication in the future from that saved checkpoint. - This is the object that is provided with state from previous runs and avoid replicating the entire set of - data everytime. - - :return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object. - """ - report_name = config.get("report_name") + return schema + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + r = response.json() + + if all(key in r for key in ["limit", "offset", "rowCount"]): + limit, offset, total_rows = r["limit"], r["offset"], r["rowCount"] + + if total_rows <= offset: + return None + + return {"limit": limit, "offset": offset + limit} - response = self._run_report(config) - rows = Client.response_to_list(response) + def path( + self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return f"properties/{self.config['property_id']}:runReport" - last_cursor_value = state.get(report_name, {}).get(DEFAULT_CURSOR_FIELD, "") + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + r = response.json() - for row in rows: - if last_cursor_value <= row[DEFAULT_CURSOR_FIELD]: - yield AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream=report_name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000), + dimensions = [h["name"] for h in r["dimensionHeaders"]] + metrics = [h["name"] for h in r["metricHeaders"]] + metrics_type_map = {h["name"]: h["type"] for h in r["metricHeaders"]} + + rows = [] + + for row in r.get("rows", []): + rows.append( + collections.ChainMap( + *[ + self.add_primary_key(), + self.add_property_id(self.config["property_id"]), + self.add_dimensions(dimensions, row), + self.add_metrics(metrics, metrics_type_map, row), + ] ) + ) + r["records"] = rows - last_cursor_value = row[DEFAULT_CURSOR_FIELD] + yield r - yield AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data={report_name: {DEFAULT_CURSOR_FIELD: last_cursor_value}})) - @staticmethod - def _run_report(config: Mapping[str, Any]) -> RunReportResponse: - property_id = config.get("property_id") - dimensions = config.get("dimensions", "").split(",") - metrics = config.get("metrics", "").split(",") - start_date = config.get("date_ranges_start_date") - end_date = config.get("date_ranges_end_date") - json_credentials = config.get("json_credentials") - - return Client(json.loads(json_credentials)).run_report(property_id, dimensions, metrics, start_date, end_date) +class IncrementalGoogleAnalyticsDataApiStream(GoogleAnalyticsDataApiBaseStream, IncrementalMixin, ABC): + _date_format = "%Y-%m-%d" + + def __init__(self, *args, **kwargs): + super(IncrementalGoogleAnalyticsDataApiStream, self).__init__(*args, **kwargs) + self._cursor_value = None + + +class GoogleAnalyticsDataApiGenericStream(IncrementalGoogleAnalyticsDataApiStream): + _default_window_in_days = 1 + _record_date_format = "%Y%m%d" + + @property + def cursor_field(self) -> Union[str, List[str]]: + return "date" + + @property + def state(self) -> MutableMapping[str, Any]: + return {self.cursor_field: self._cursor_value or utils.string_to_date(self.config["date_ranges_start_date"], self._date_format)} + + @state.setter + def state(self, value): + self._cursor_value = utils.string_to_date(value[self.cursor_field], self._date_format) + datetime.timedelta(days=1) + + def request_body_json( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Optional[Mapping]: + return { + "metrics": [{"name": m} for m in self.config["metrics"]], + "dimensions": [{"name": d} for d in self.config["dimensions"]], + "dateRanges": [stream_slice], + } + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + if not stream_slice: + return [] + records = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) + for record in records: + for row in record["records"]: + next_cursor_value = utils.string_to_date(row[self.cursor_field], self._record_date_format) + self._cursor_value = max(self._cursor_value, next_cursor_value) if self._cursor_value else next_cursor_value + yield row + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + dates = [] + + today: datetime.date = datetime.date.today() + start_date: datetime.date = self.state[self.cursor_field] + + timedelta: int = self.config["window_in_days"] or self._default_window_in_days + + while start_date <= today: + end_date: datetime.date = start_date + datetime.timedelta(days=timedelta) + if timedelta > 1 and end_date > today: + end_date: datetime.date = start_date + datetime.timedelta(days=timedelta - (end_date - today).days) + + dates.append( + { + "startDate": utils.date_to_string(start_date, self._date_format), + "endDate": utils.date_to_string(end_date, self._date_format), + } + ) + + start_date: datetime.date = end_date + datetime.timedelta(days=1) + + return dates or [None] + + +class GoogleAnalyticsDataApiTestConnectionStream(GoogleAnalyticsDataApiAbstractStream): + primary_key = None + http_method = "GET" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path( + self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return f"properties/{self.config['property_id']}/metadata" + + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + yield response.json() + + +class SourceGoogleAnalyticsDataApi(AbstractSource): + def __init__(self, *args, **kwargs): + super(SourceGoogleAnalyticsDataApi, self).__init__(*args, **kwargs) + + self._authenticator = None + + def get_authenticator(self, config: Mapping[str, Any]): + if not self._authenticator: + self._authenticator = get_authenticator(config["credentials"]) + return self._authenticator + + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: + authenticator = self.get_authenticator(config) + stream = GoogleAnalyticsDataApiTestConnectionStream(config=config, authenticator=authenticator) + try: + next(iter(stream.read_records(sync_mode=SyncMode.full_refresh))) + except Exception as e: + return False, str(e) + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + authenticator = self.get_authenticator(config) + + reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json")) + if "custom_reports" in config: + custom_reports = json.loads(config["custom_reports"]) + reports += custom_reports + + return [ + type(report["name"], (GoogleAnalyticsDataApiGenericStream,), {})( + config=dict(**config, metrics=report["metrics"], dimensions=report["dimensions"]), authenticator=authenticator + ) + for report in reports + ] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json index 3b6aff3497cfa..dd5bf670b1b21 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json @@ -6,13 +6,9 @@ "type": "object", "required": [ "property_id", - "json_credentials", - "report_name", - "dimensions", - "metrics", - "date_ranges_start_date", - "date_ranges_end_date" + "date_ranges_start_date" ], + "additionalProperties": true, "properties": { "property_id": { "type": "string", @@ -20,43 +16,103 @@ "description": "A Google Analytics GA4 property identifier whose events are tracked. Specified in the URL path and not the body", "order": 1 }, - "json_credentials": { - "type": "string", - "title": "JSON Credentials", - "description": "The JSON key of the Service Account to use for authorization", - "airbyte_secret": true, - "order": 2 - }, - "report_name": { - "type": "string", - "title": "Report Name", - "description": "The report name", - "order": 3 - }, - "dimensions": { - "type": "string", - "title": "Dimensions", - "description": "Comma seprated report dimensions https://developers.google.com/analytics/devguides/reporting/data/v1/api-schema#dimensions", - "order": 4 - }, - "metrics": { - "type": "string", - "title": "Metrics", - "description": "Comma seprated report metrics https://developers.google.com/analytics/devguides/reporting/data/v1/api-schema#metrics", - "order": 5 + "credentials": { + "order": 0, + "type": "object", + "title": "Credentials", + "description": "Credentials for the service", + "oneOf": [ + { + "title": "Authenticate via Google (Oauth)", + "type": "object", + "required": ["client_id", "client_secret", "refresh_token"], + "properties": { + "auth_type": { + "type": "string", + "const": "Client", + "order": 0 + }, + "client_id": { + "title": "Client ID", + "type": "string", + "description": "The Client ID of your Google Analytics developer application.", + "airbyte_secret": true, + "order": 1 + }, + "client_secret": { + "title": "Client Secret", + "type": "string", + "description": "The Client Secret of your Google Analytics developer application.", + "airbyte_secret": true, + "order": 2 + }, + "refresh_token": { + "title": "Refresh Token", + "type": "string", + "description": "The token for obtaining a new access token.", + "airbyte_secret": true, + "order": 3 + }, + "access_token": { + "title": "Access Token (Optional)", + "type": "string", + "description": "Access Token for making authenticated requests.", + "airbyte_secret": true, + "order": 4 + } + } + }, + { + "type": "object", + "title": "Service Account Key Authentication", + "required": ["credentials_json"], + "properties": { + "auth_type": { + "type": "string", + "const": "Service", + "order": 0 + }, + "credentials_json": { + "title": "Service Account JSON Key", + "type": "string", + "description": "The JSON key of the service account to use for authorization", + "examples": [ + "{ \"type\": \"service_account\", \"project_id\": YOUR_PROJECT_ID, \"private_key_id\": YOUR_PRIVATE_KEY, ... }" + ], + "airbyte_secret": true + } + } + } + ] }, "date_ranges_start_date": { "type": "string", "title": "Date Range Start Date", "description": "The start date. One of the values Ndaysago, yesterday, today or in the format YYYY-MM-DD", - "order": 6 + "order": 2 }, - "date_ranges_end_date": { + "custom_reports": { + "order": 3, "type": "string", - "title": "Date Range End Date", - "description": "The end date. One of the values Ndaysago, yesterday, today or in the format YYYY-MM-DD", - "order": 7 + "title": "Custom Reports (Optional)", + "description": "A JSON array describing the custom reports you want to sync from Google Analytics. See the docs for more information about the exact format you can use to fill out this field." + }, + "window_in_days": { + "type": "integer", + "title": "Data request time increment in days (Optional)", + "description": "The time increment used by the connector when requesting data from the Google Analytics API. More information is available in the the docs. The bigger this value is, the faster the sync will be, but the more likely that sampling will be applied to your data, potentially causing inaccuracies in the returned results. We recommend setting this to 1 unless you have a hard requirement to make the sync faster at the expense of accuracy. The minimum allowed value for this field is 1, and the maximum is 364. ", + "examples": [30, 60, 90, 120, 200, 364], + "default": 1, + "order": 4 } } + }, + "authSpecification": { + "auth_type": "oauth2.0", + "oauth2Specification": { + "rootObject": ["credentials", 0], + "oauthFlowInitParameters": [["client_id"], ["client_secret"]], + "oauthFlowOutputParameters": [["access_token"], ["refresh_token"]] + } } } diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py new file mode 100644 index 0000000000000..cad009238bde2 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py @@ -0,0 +1,18 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import calendar +import datetime + + +def datetime_to_secs(dt: datetime.datetime) -> int: + return calendar.timegm(dt.utctimetuple()) + + +def string_to_date(d: str, f: str = "%Y-%m-%d") -> datetime.date: + return datetime.datetime.strptime(d, f).date() + + +def date_to_string(d: datetime.date, f: str = "%Y-%m-%d") -> str: + return d.strftime(f) diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py index f4fb22efe01af..16cead27e5535 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py @@ -2,39 +2,58 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import datetime from unittest.mock import MagicMock +import pytest from airbyte_cdk.models import AirbyteConnectionStatus, Status +from airbyte_cdk.sources.streams.http import HttpStream from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi - -def test_check_connection(mocker): +json_credentials = """ +{ + "type": "service_account", + "project_id": "unittest-project-id", + "private_key_id": "9qf98e52oda52g5ne23al6evnf13649c2u077162c", + "private_key": "", + "client_email": "google-analytics-access@unittest-project-id.iam.gserviceaccount.com", + "client_id": "213243192021686092537", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/google-analytics-access%40unittest-project-id.iam.gserviceaccount.com" +} +""" + + +@pytest.fixture +def patch_base_class(mocker): + return { + "config": { + "property_id": "108176369", + "credentials": {"auth_type": "Service", "credentials_json": json_credentials}, + "date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"), + } + } + + +def test_check_connection(mocker, patch_base_class): source = SourceGoogleAnalyticsDataApi() + record = MagicMock() - report_mock = MagicMock() - mocker.patch.object(SourceGoogleAnalyticsDataApi, "_run_report", return_value=report_mock) - - logger_mock = MagicMock() - config_mock = MagicMock() + logger_mock, config_mock = MagicMock(), MagicMock() + config_mock.__getitem__.side_effect = patch_base_class["config"].__getitem__ + mocker.patch.object(HttpStream, "read_records", return_value=[record]) assert source.check(logger_mock, config_mock) == AirbyteConnectionStatus(status=Status.SUCCEEDED) -def test_discover(mocker): +def test_streams(mocker, patch_base_class): source = SourceGoogleAnalyticsDataApi() - dimensions_header_mock = MagicMock() - dimensions_header_mock.name = "dimensions" - - metrics_header_mock = MagicMock() - metrics_header_mock.name = "metrics" - - report_mock = MagicMock(dimension_headers=[dimensions_header_mock], metric_headers=[metrics_header_mock]) - mocker.patch.object(SourceGoogleAnalyticsDataApi, "_run_report", return_value=report_mock) - - logger_mock = MagicMock() - config_mock = {"report_name": "test"} + config_mock = MagicMock() + config_mock.__getitem__.side_effect = patch_base_class["config"].__getitem__ - catalog = source.discover(logger_mock, config_mock) - expected_streams_number = 1 - assert len(catalog.streams) == expected_streams_number + streams = source.streams(patch_base_class["config"]) + expected_streams_number = 8 + assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py new file mode 100644 index 0000000000000..e872259c5fc29 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py @@ -0,0 +1,275 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import copy +import datetime +import random +from http import HTTPStatus +from typing import Any, Mapping +from unittest.mock import MagicMock + +import pytest +from source_google_analytics_data_api.source import GoogleAnalyticsDataApiGenericStream + +json_credentials = """ +{ + "type": "service_account", + "project_id": "unittest-project-id", + "private_key_id": "9qf98e52oda52g5ne23al6evnf13649c2u077162c", + "private_key": "", + "client_email": "google-analytics-access@unittest-project-id.iam.gserviceaccount.com", + "client_id": "213243192021686092537", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/google-analytics-access%40unittest-project-id.iam.gserviceaccount.com" +} +""" + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GoogleAnalyticsDataApiGenericStream, "path", f"{random.randint(100000000, 999999999)}:runReport") + mocker.patch.object(GoogleAnalyticsDataApiGenericStream, "primary_key", "test_primary_key") + mocker.patch.object(GoogleAnalyticsDataApiGenericStream, "__abstractmethods__", set()) + + return { + "config": { + "property_id": "496180525", + "credentials": {"auth_type": "Service", "credentials_json": json_credentials}, + "dimensions": ["date", "deviceCategory", "operatingSystem", "browser"], + "metrics": [ + "totalUsers", + "newUsers", + "sessions", + "sessionsPerUser", + "averageSessionDuration", + "screenPageViews", + "screenPageViewsPerSession", + "bounceRate", + ], + "date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"), + } + } + + +def test_request_params(patch_base_class): + assert ( + GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]).request_params( + stream_state=MagicMock(), stream_slice=MagicMock(), next_page_token=MagicMock() + ) + == {} + ) + + +def test_request_body_json(patch_base_class): + request_body_params = {"stream_state": MagicMock(), "stream_slice": MagicMock(), "next_page_token": None} + + expected_body_json = { + "metrics": [ + {"name": "totalUsers"}, + {"name": "newUsers"}, + {"name": "sessions"}, + {"name": "sessionsPerUser"}, + {"name": "averageSessionDuration"}, + {"name": "screenPageViews"}, + {"name": "screenPageViewsPerSession"}, + {"name": "bounceRate"}, + ], + "dimensions": [ + {"name": "date"}, + {"name": "deviceCategory"}, + {"name": "operatingSystem"}, + {"name": "browser"}, + ], + "dateRanges": [request_body_params["stream_slice"]], + } + + request_body_json = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]).request_body_json(**request_body_params) + assert request_body_json == expected_body_json + + +def test_next_page_token_equal_chunk(patch_base_class): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + response = MagicMock() + response.json.side_effect = [ + {"limit": 100000, "offset": 0, "rowCount": 200000}, + {"limit": 100000, "offset": 100000, "rowCount": 200000}, + {"limit": 100000, "offset": 200000, "rowCount": 200000}, + ] + inputs = {"response": response} + + expected_tokens = [ + { + "limit": 100000, + "offset": 100000, + }, + { + "limit": 100000, + "offset": 200000, + }, + None, + ] + + for expected_token in expected_tokens: + assert stream.next_page_token(**inputs) == expected_token + + +def test_next_page_token(patch_base_class): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + response = MagicMock() + response.json.side_effect = [ + {"limit": 100000, "offset": 0, "rowCount": 250000}, + {"limit": 100000, "offset": 100000, "rowCount": 250000}, + {"limit": 100000, "offset": 200000, "rowCount": 250000}, + {"limit": 100000, "offset": 300000, "rowCount": 250000}, + ] + inputs = {"response": response} + + expected_tokens = [ + { + "limit": 100000, + "offset": 100000, + }, + { + "limit": 100000, + "offset": 200000, + }, + { + "limit": 100000, + "offset": 300000, + }, + None, + ] + + for expected_token in expected_tokens: + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(patch_base_class): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + + response_data = { + "dimensionHeaders": [{"name": "date"}, {"name": "deviceCategory"}, {"name": "operatingSystem"}, {"name": "browser"}], + "metricHeaders": [ + {"name": "totalUsers", "type": "TYPE_INTEGER"}, + {"name": "newUsers", "type": "TYPE_INTEGER"}, + {"name": "sessions", "type": "TYPE_INTEGER"}, + {"name": "sessionsPerUser", "type": "TYPE_FLOAT"}, + {"name": "averageSessionDuration", "type": "TYPE_SECONDS"}, + {"name": "screenPageViews", "type": "TYPE_INTEGER"}, + {"name": "screenPageViewsPerSession", "type": "TYPE_FLOAT"}, + {"name": "bounceRate", "type": "TYPE_FLOAT"}, + ], + "rows": [ + { + "dimensionValues": [{"value": "20220731"}, {"value": "desktop"}, {"value": "Macintosh"}, {"value": "Chrome"}], + "metricValues": [ + {"value": "344"}, + {"value": "169"}, + {"value": "420"}, + {"value": "1.2209302325581395"}, + {"value": "194.76313766428572"}, + {"value": "614"}, + {"value": "1.4619047619047618"}, + {"value": "0.47857142857142859"}, + ], + }, + { + "dimensionValues": [{"value": "20220731"}, {"value": "desktop"}, {"value": "Windows"}, {"value": "Chrome"}], + "metricValues": [ + {"value": "322"}, + {"value": "211"}, + {"value": "387"}, + {"value": "1.2018633540372672"}, + {"value": "249.21595714211884"}, + {"value": "669"}, + {"value": "1.7286821705426356"}, + {"value": "0.42377260981912146"}, + ], + }, + ], + "rowCount": 54, + "metadata": {"currencyCode": "USD", "timeZone": "America/Los_Angeles"}, + "kind": "analyticsData#runReport", + } + + expected_data = copy.deepcopy(response_data) + expected_data["records"] = [ + { + "property_id": "496180525", + "date": "20220731", + "deviceCategory": "desktop", + "operatingSystem": "Macintosh", + "browser": "Chrome", + "totalUsers": 344, + "newUsers": 169, + "sessions": 420, + "sessionsPerUser": 1.2209302325581395, + "averageSessionDuration": 194.76313766428572, + "screenPageViews": 614, + "screenPageViewsPerSession": 1.4619047619047618, + "bounceRate": 0.47857142857142859, + }, + { + "property_id": "496180525", + "date": "20220731", + "deviceCategory": "desktop", + "operatingSystem": "Windows", + "browser": "Chrome", + "totalUsers": 322, + "newUsers": 211, + "sessions": 387, + "sessionsPerUser": 1.2018633540372672, + "averageSessionDuration": 249.21595714211884, + "screenPageViews": 669, + "screenPageViewsPerSession": 1.7286821705426356, + "bounceRate": 0.42377260981912146, + }, + ] + + response = MagicMock() + response.json.return_value = response_data + inputs = {"response": response, "stream_state": {}} + actual_records: Mapping[str, Any] = next(iter(stream.parse_response(**inputs))) + for record in actual_records["records"]: + del record["uuid"] + assert actual_records == expected_data + + +def test_request_headers(patch_base_class): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + expected_method = "POST" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/unit_test.py deleted file mode 100644 index dddaea0060fa1..0000000000000 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/unit_test.py +++ /dev/null @@ -1,7 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -def test_example_method(): - assert True diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index b6a1938c27dd8..449b7f240a218 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -27,7 +27,8 @@ public OAuthImplementationFactory(final ConfigRepository configRepository, final .put("airbyte/source-facebook-pages", new FacebookPagesOAuthFlow(configRepository, httpClient)) .put("airbyte/source-github", new GithubOAuthFlow(configRepository, httpClient)) .put("airbyte/source-google-ads", new GoogleAdsOAuthFlow(configRepository, httpClient)) - .put("airbyte/source-google-analytics-v4", new GoogleAnalyticsOAuthFlow(configRepository, httpClient)) + .put("airbyte/source-google-analytics-v4", new GoogleAnalyticsViewIdOAuthFlow(configRepository, httpClient)) + .put("airbyte/source-google-analytics-data-api", new GoogleAnalyticsPropertyIdOAuthFlow(configRepository, httpClient)) .put("airbyte/source-google-search-console", new GoogleSearchConsoleOAuthFlow(configRepository, httpClient)) .put("airbyte/source-google-sheets", new GoogleSheetsOAuthFlow(configRepository, httpClient)) .put("airbyte/source-harvest", new HarvestOAuthFlow(configRepository, httpClient)) diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsPropertyIdOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsPropertyIdOAuthFlow.java new file mode 100644 index 0000000000000..b0b876a85685c --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsPropertyIdOAuthFlow.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows.google; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.config.persistence.ConfigRepository; +import java.net.http.HttpClient; +import java.util.function.Supplier; + +public class GoogleAnalyticsPropertyIdOAuthFlow extends GoogleOAuthFlow { + + public static final String SCOPE_URL = "https://www.googleapis.com/auth/analytics.readonly"; + + public GoogleAnalyticsPropertyIdOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient) { + super(configRepository, httpClient); + } + + @VisibleForTesting + GoogleAnalyticsPropertyIdOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient, final Supplier stateSupplier) { + super(configRepository, httpClient, stateSupplier); + } + + @Override + protected String getScope() { + return SCOPE_URL; + } + +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsViewIdOAuthFlow.java similarity index 64% rename from airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java rename to airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsViewIdOAuthFlow.java index 357e4a67bbdad..79a86eee5cae6 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsViewIdOAuthFlow.java @@ -9,16 +9,16 @@ import java.net.http.HttpClient; import java.util.function.Supplier; -public class GoogleAnalyticsOAuthFlow extends GoogleOAuthFlow { +public class GoogleAnalyticsViewIdOAuthFlow extends GoogleOAuthFlow { public static final String SCOPE_URL = "https://www.googleapis.com/auth/analytics.readonly"; - public GoogleAnalyticsOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient) { + public GoogleAnalyticsViewIdOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient) { super(configRepository, httpClient); } @VisibleForTesting - GoogleAnalyticsOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient, final Supplier stateSupplier) { + GoogleAnalyticsViewIdOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient, final Supplier stateSupplier) { super(configRepository, httpClient, stateSupplier); } diff --git a/airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowIntegrationTest.java b/airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowIntegrationTest.java index 84cf7ff48fc54..dab0bc412c0a2 100644 --- a/airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowIntegrationTest.java +++ b/airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowIntegrationTest.java @@ -42,7 +42,7 @@ public class GoogleAnalyticsOAuthFlowIntegrationTest { private static final Path CREDENTIALS_PATH = Path.of("secrets/google_analytics.json"); private ConfigRepository configRepository; - private GoogleAnalyticsOAuthFlow googleAnalyticsOAuthFlow; + private GoogleAnalyticsViewIdOAuthFlow googleAnalyticsViewIdOAuthFlow; private HttpServer server; private ServerHandler serverHandler; private HttpClient httpClient; @@ -55,7 +55,7 @@ public void setup() throws IOException { } configRepository = mock(ConfigRepository.class); httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); - googleAnalyticsOAuthFlow = new GoogleAnalyticsOAuthFlow(configRepository, httpClient); + googleAnalyticsViewIdOAuthFlow = new GoogleAnalyticsViewIdOAuthFlow(configRepository, httpClient); server = HttpServer.create(new InetSocketAddress(80), 0); server.setExecutor(null); // creates a default executor @@ -84,7 +84,7 @@ public void testFullGoogleOAuthFlow() throws InterruptedException, ConfigNotFoun .put("client_id", credentialsJson.get("credentials").get("client_id").asText()) .put("client_secret", credentialsJson.get("credentials").get("client_secret").asText()) .build()))))); - final String url = googleAnalyticsOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL, Jsons.emptyObject(), null); + final String url = googleAnalyticsViewIdOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL, Jsons.emptyObject(), null); LOGGER.info("Waiting for user consent at: {}", url); // TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing // access... @@ -93,7 +93,7 @@ public void testFullGoogleOAuthFlow() throws InterruptedException, ConfigNotFoun limit -= 1; } assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time"); - final Map params = googleAnalyticsOAuthFlow.completeSourceOAuth(workspaceId, definitionId, + final Map params = googleAnalyticsViewIdOAuthFlow.completeSourceOAuth(workspaceId, definitionId, Map.of("code", serverHandler.getParamValue()), REDIRECT_URL); LOGGER.info("Response from completing OAuth Flow is: {}", params.toString()); assertTrue(params.containsKey("credentials")); diff --git a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowTest.java b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowTest.java index 42d5f75411e06..2e0690a3f2cb8 100644 --- a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowTest.java +++ b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlowTest.java @@ -11,7 +11,7 @@ public class GoogleAnalyticsOAuthFlowTest extends BaseOAuthFlowTest { @Override protected BaseOAuthFlow getOAuthFlow() { - return new GoogleAnalyticsOAuthFlow(getConfigRepository(), getHttpClient(), this::getConstantState); + return new GoogleAnalyticsViewIdOAuthFlow(getConfigRepository(), getHttpClient(), this::getConstantState); } @Override diff --git a/docs/integrations/sources/google-analytics-v4.md b/docs/integrations/sources/google-analytics-v4.md index 1649bfa150db3..b017428d80e0d 100644 --- a/docs/integrations/sources/google-analytics-v4.md +++ b/docs/integrations/sources/google-analytics-v4.md @@ -7,12 +7,11 @@ This connector supports [Google Analytics v4](https://developers.google.com/anal ## Prerequisites * JSON credentials for the service account that has access to Google Analytics. For more details check [instructions](https://support.google.com/analytics/answer/1009702#zippy=%2Cin-this-article) +* OAuth 2.0 credentials for the service account that has access to Google Analytics * Property ID -* Report name -* List of report dimensions comma separated -* List of report metrics comma separated -* Report start date -* Report end date +* Custom reports in format `{"name": "", "dimensions": ["", ...], "metrics": ["metric-name", ...]}` +* Date Range Start Date +* Data request time increment in days (Optional) ## Step 1: Set up Source @@ -41,7 +40,7 @@ Specify the Property ID as set [here](https://analytics.google.com/analytics/web ## Step 2: Set up the source connector in Airbyte Set the required fields in the Google Analytics Data API connector page such as the JSON credentials, property ID, -report name, dimensions, metrics and start and end dates. +custom reports, date ranges start date, data request time increment in days. ## Supported sync modes @@ -57,13 +56,13 @@ The Google Analytics source connector supports the following [sync modes](https: # Reports -The reports are custom by setting the dimensions and metrics required. To support Incremental sync, the `date` dimension is -added by default to any report and no need to add it as a dimension. There is only 1 connector per report. To add more reports, you need to create -a new connection. +The reports are custom by setting the dimensions and metrics required. To support Incremental sync, the `uuid` field is +added by default to any report. There are 8 default reports. To add more reports, you need to specify the `custom reports` field. ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------| -| 0.0.2 | 2022-07-27 | [15087](https://github.com/airbytehq/airbyte/pull/15087) | fix documentationUrl | -| 0.0.1 | 2022-05-09 | [12701](https://github.com/airbytehq/airbyte/pull/12701) | Introduce Google Analytics Data API source | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------| +| 0.0.3 | 2022-08-15 | [15229](https://github.com/airbytehq/airbyte/pull/15229) | Source Google Analytics Data Api: code refactoring | +| 0.0.2 | 2022-07-27 | [15087](https://github.com/airbytehq/airbyte/pull/15087) | fix documentationUrl | +| 0.0.1 | 2022-05-09 | [12701](https://github.com/airbytehq/airbyte/pull/12701) | Introduce Google Analytics Data API source |