Skip to content

Commit

Permalink
🎉 Linkedin Ads: support of oAuth2 (airbytehq#7839)
Browse files Browse the repository at this point in the history
* fix 404 responses for the ticket_comments stream

* add unit test

* add unit test

* add oauth2 access token

* Update airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py

Co-authored-by: George Claireaux <george@claireaux.co.uk>

* switching among auth methods

* update spec file

* update CI secrets logic

* remove debug data

* add a debug message

* fix json convertation

* support one secret by several connectors

* Update tools/bin/ci_credentials.sh

Co-authored-by: LiRen Tu <tuliren@gmail.com>

* update function names

* update docs

* reset failed changes

* update json set value

* update spec file

* add oauth2 logic

* update doc and version

* add tests

* correction of spec

* update tests

* update spec file

Co-authored-by: Maksym Pavlenok <maksym.pavlenok@globallogic.com>
Co-authored-by: George Claireaux <george@claireaux.co.uk>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: LiRen Tu <tuliren@gmail.com>
  • Loading branch information
5 people authored and schlattk committed Jan 4, 2022
1 parent 96d40ba commit db0fd32
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3551,7 +3551,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mongodb-v2:0.1.3"
- dockerImage: "airbyte/source-mongodb-v2:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ COPY source_linkedin_ads ./source_linkedin_ads
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-linkedin-ads
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_token.json"
status: "succeed"
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
basic_read:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator

from .analytics import make_analytics_slices, merge_chunks, update_analytics_params
from .utils import get_parent_stream_values, transform_data
Expand Down Expand Up @@ -303,31 +304,52 @@ class SourceLinkedinAds(AbstractSource):
- implementation to call each stream with it's input parameters.
"""

@classmethod
def get_authenticator(cls, config: Mapping[str, Any]) -> TokenAuthenticator:
"""
Validate input parameters and generate a necessary Authentication object
This connectors support 2 auth methods:
1) direct access token with TTL = 2 months
2) refresh token (TTL = 1 year) which can be converted to access tokens
Every new refresh revokes all previous access tokens q
"""
auth_method = config.get("credentials", {}).get("auth_method")
if not auth_method or auth_method == "access_token":
# support of backward compatibility with old exists configs
access_token = config["credentials"]["access_token"] if auth_method else config["access_token"]
return TokenAuthenticator(token=access_token)
elif auth_method == "oAuth2.0":
return Oauth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id=config["credentials"]["client_id"],
client_secret=config["credentials"]["client_secret"],
refresh_token=config["credentials"]["refresh_token"],
)
raise Exception("incorrect input parameters")

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Testing connection availability for the connector.
:: for this check method the Customer must have the "r_liteprofile" scope enabled.
:: more info: https://docs.microsoft.com/linkedin/consumer/integrations/self-serve/sign-in-with-linkedin
"""

header = TokenAuthenticator(token=config["access_token"]).get_auth_header()
profile_url = "https://api.linkedin.com/v2/me"

config["authenticator"] = self.get_authenticator(config)
stream = Accounts(config)
# need to load the first item only
stream.records_limit = 1
try:
response = requests.get(url=profile_url, headers=header)
response.raise_for_status()
next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
return True, None
except requests.exceptions.RequestException as e:
return False, f"{e}, {response.json().get('message')}"
except Exception as e:
return False, e

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Mapping a input config of the user input configuration as defined in the connector spec.
Passing config to the streams.
"""

config["authenticator"] = TokenAuthenticator(token=config["access_token"])

config["authenticator"] = self.get_authenticator(config)
return [
Accounts(config),
AccountUsers(config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Linkedin Ads Spec",
"type": "object",
"required": ["start_date", "access_token"],
"additionalProperties": false,
"required": ["start_date"],
"additionalProperties": true,
"properties": {
"start_date": {
"type": "string",
Expand All @@ -14,12 +14,6 @@
"description": "Date in the format 2020-09-17. Any data before this date will not be replicated.",
"examples": ["2021-05-17"]
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "The token value ganerated using Auth Code",
"airbyte_secret": true
},
"account_ids": {
"title": "Account IDs",
"type": "array",
Expand All @@ -28,7 +22,64 @@
"type": "integer"
},
"default": []
},
"credentials": {
"title": "Authorization Method",
"type": "object",
"oneOf": [
{
"type": "object",
"title": "oAuth2.0",
"required": ["client_id", "client_secret", "refresh_token"],
"properties": {
"auth_method": {
"type": "string",
"const": "oAuth2.0"
},
"client_id": {
"type": "string",
"description": "The API ID of the Gitlab developer application.",
"airbyte_secret": true
},
"client_secret": {
"type": "string",
"description": "The API Secret the Gitlab developer application.",
"airbyte_secret": true
},
"refresh_token": {
"type": "string",
"description": "The key to refresh the expired access_token.",
"airbyte_secret": true
}
}
},
{
"title": "Access Token",
"type": "object",
"required": ["access_token"],
"properties": {
"auth_method": {
"type": "string",
"const": "access_token"
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "The token value ganerated using Auth Code",
"airbyte_secret": true
}
}
}
]
}
}
},
"authSpecification": {
"auth_type": "oauth2.0",
"oauth2Specification": {
"rootObject": ["credentials", "0"],
"oauthFlowInitParameters": [["client_id"], ["client_secret"]],
"oauthFlowOutputParameters": [["refresh_token"]]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.oauth.flows.GithubOAuthFlow;
import io.airbyte.oauth.flows.HubspotOAuthFlow;
import io.airbyte.oauth.flows.IntercomOAuthFlow;
import io.airbyte.oauth.flows.LinkedinAdsOAuthFlow;
import io.airbyte.oauth.flows.PipeDriveOAuthFlow;
import io.airbyte.oauth.flows.QuickbooksOAuthFlow;
import io.airbyte.oauth.flows.SalesforceOAuthFlow;
Expand All @@ -35,7 +36,6 @@ public class OAuthImplementationFactory {

public OAuthImplementationFactory(final ConfigRepository configRepository, final HttpClient httpClient) {
OAUTH_FLOW_MAPPING = ImmutableMap.<String, OAuthFlowImplementation>builder()
// These are listed in alphabetical order below to facilitate manual look-up:
.put("airbyte/source-asana", new AsanaOAuthFlow(configRepository, httpClient))
.put("airbyte/source-facebook-marketing", new FacebookMarketingOAuthFlow(configRepository, httpClient))
.put("airbyte/source-facebook-pages", new FacebookPagesOAuthFlow(configRepository, httpClient))
Expand All @@ -49,6 +49,7 @@ public OAuthImplementationFactory(final ConfigRepository configRepository, final
.put("airbyte/source-instagram", new InstagramOAuthFlow(configRepository, httpClient))
.put("airbyte/source-pipedrive", new PipeDriveOAuthFlow(configRepository, httpClient))
.put("airbyte/source-quickbooks", new QuickbooksOAuthFlow(configRepository, httpClient))
.put("airbyte/source-linkedin-ads", new LinkedinAdsOAuthFlow(configRepository, httpClient))
.put("airbyte/source-salesforce", new SalesforceOAuthFlow(configRepository, httpClient))
.put("airbyte/source-slack", new SlackOAuthFlow(configRepository, httpClient))
.put("airbyte/source-snapchat-marketing", new SnapchatMarketingOAuthFlow(configRepository, httpClient))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.BaseOAuth2Flow;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.http.client.utils.URIBuilder;

public class LinkedinAdsOAuthFlow extends BaseOAuth2Flow {

private static final String AUTHORIZE_URL = "https://www.linkedin.com/oauth/v2/authorization";
private static final String ACCESS_TOKEN_URL = "https://www.linkedin.com/oauth/v2/accessToken";
private static final String SCOPES = "r_ads_reporting r_emailaddress r_liteprofile r_ads r_basicprofile r_organization_social";

public LinkedinAdsOAuthFlow(ConfigRepository configRepository, HttpClient httpClient) {
super(configRepository, httpClient);
}

@VisibleForTesting
public LinkedinAdsOAuthFlow(ConfigRepository configRepository, final HttpClient httpClient, Supplier<String> stateSupplier) {
super(configRepository, httpClient, stateSupplier);
}

@Override
protected String formatConsentUrl(UUID definitionId,
String clientId,
String redirectUrl,
final JsonNode inputOAuthConfiguration)
throws IOException {
try {
return new URIBuilder(AUTHORIZE_URL)
.addParameter("client_id", clientId)
.addParameter("redirect_uri", redirectUrl)
.addParameter("response_type", "code")
.addParameter("scope", SCOPES)
.addParameter("state", getState())
.build().toString();
} catch (URISyntaxException e) {
throw new IOException("Failed to format Consent URL for OAuth flow", e);
}
}

@Override
protected String getAccessTokenUrl() {
return ACCESS_TOKEN_URL;
}

@Override
protected Map<String, String> getAccessTokenQueryParameters(final String clientId,
final String clientSecret,
final String authCode,
final String redirectUrl) {
return ImmutableMap.<String, String>builder()
.putAll(super.getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))
.put("grant_type", "authorization_code")
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.OAuthFlowImplementation;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.net.http.HttpClient;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;

public class LinkedinAdsOAuthFlowIntegrationTest extends OAuthFlowIntegrationTest {

protected static final Path CREDENTIALS_PATH = Path.of("secrets/config_oauth.json");
protected static final String REDIRECT_URL = "http://localhost:3000/auth_flow";

@Override
protected int getServerListeningPort() {
return 3000;
}

@Override
protected Path getCredentialsPath() {
return CREDENTIALS_PATH;
}

@Override
protected OAuthFlowImplementation getFlowImplementation(final ConfigRepository configRepository, final HttpClient httpClient) {
return new LinkedinAdsOAuthFlow(configRepository, httpClient);
}

@SuppressWarnings({"BusyWait", "unchecked"})
@Test
public void testFullOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException {
int limit = 20;
final UUID workspaceId = UUID.randomUUID();
final UUID definitionId = UUID.randomUUID();
final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH));
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString);
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.jsonNode(Map.of("credentials", ImmutableMap.builder()
.put("client_id", credentialsJson.get("client_id").asText())
.put("client_secret", credentialsJson.get("client_secret").asText())
.build())))));
final String url =
getFlowImplementation(configRepository, httpClient).getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL, Jsons.emptyObject(), null);
LOGGER.info("Waiting for user consent at: {}", url);
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
// access...
while (!serverHandler.isSucceeded() && limit > 0) {
Thread.sleep(1000);
limit -= 1;
}
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
final Map<String, Object> params = flow.completeSourceOAuth(workspaceId, definitionId,
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);

LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
assertTrue(params.containsKey("credentials"));
final Map<String, Object> credentials;
credentials = Collections.unmodifiableMap((Map<String, Object>) params.get("credentials"));
assertTrue(credentials.containsKey("refresh_token"));
assertTrue(credentials.get("refresh_token").toString().length() > 0);
}

}
Loading

0 comments on commit db0fd32

Please sign in to comment.