From 8497d786130a75733e9b93a12a7b8d62bb2cdb9a Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 11 Aug 2022 19:34:10 -0700 Subject: [PATCH] Migrate sendgrid to config-based (#15257) * fix spec * read records from lists stream * campaigns * contacts * stats_automations * segments * single_sends * templates * suppressions_global * suppression groups * suppression group memebers * blocks * bounces * invalid emails and spam reports * bump cdk version * fix paths * bump cdk version * only define cursor field in one place * move to definitions * move bounces inside the streams array * move all streams within the streams array * update sendgrid config * fix * derp * rename field * fix parse * Revert "fix parse" This reverts commit 3c76c5a782ebf8b252793df1c93d50d5617edaf6. * fix parse timestamp * extract datetime parser * remove print * use parser * top level docstring * rename variable * Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid" This reverts commit 99caa5884eba85bbf7caaa8006ccdf4bc3552e5b, reversing changes made to 028bdfbb5e5dfe0a88e43a239d6b27de99046bd0. * Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid"" This reverts commit 8d55afa5aaeb707e8e9d0772109f6e940aec4799. * Revert "Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid""" This reverts commit 9b70a3b67c055b6a678cec4aa504fb9da17745e0. * do not use timestamp() * Revert "do not use timestamp()" This reverts commit 016cb69193535ef67536fcf7909b59e68d423c78. * Handle extracting no records from root * bump cdk version * handle empty record * update unit test * messages stream needs a different slicer * handle missing keys * Update unit test * record extractor interface * dpath extractor * docstring * use dpath * Revert "Merge branch 'alex/selectNoRecords' into alex/configbasedsendgrid" This reverts commit ac9237495279c9a9dec4c44df66358a6200be7fd, reversing changes made to e10d6b9f4afc681d340c586852cab221932f16ee. * bump cdk version * use dpath * missing cursor field * start DRYing the config * delete more cruff * DRY * get start time from config * delete custom streams * step=30days * bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 4 +- .../connectors/source-sendgrid/Dockerfile | 2 +- .../connectors/source-sendgrid/setup.py | 2 +- .../source_sendgrid/sendgrid.yaml | 256 ++++++++++++++++ .../source-sendgrid/source_sendgrid/source.py | 66 +---- .../source-sendgrid/source_sendgrid/spec.json | 2 +- .../source_sendgrid/streams.py | 273 ------------------ .../source-sendgrid/unit_tests/unit_test.py | 46 +-- docs/integrations/sources/sendgrid.md | 15 +- 10 files changed, 306 insertions(+), 362 deletions(-) create mode 100644 airbyte-integrations/connectors/source-sendgrid/source_sendgrid/sendgrid.yaml delete mode 100644 airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 6abe381b2c64f..639bfcf994a4c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -892,7 +892,7 @@ - name: Sendgrid sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87 dockerRepository: airbyte/source-sendgrid - dockerImageTag: 0.2.8 + dockerImageTag: 0.2.9 documentationUrl: https://docs.airbyte.io/integrations/sources/sendgrid icon: sendgrid.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index fa266eefc2a0d..2af21788cf48f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8644,7 +8644,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-sendgrid:0.2.8" +- dockerImage: "airbyte/source-sendgrid:0.2.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/sendgrid" connectionSpecification: @@ -8653,7 +8653,7 @@ type: "object" required: - "apikey" - additionalProperties: false + additionalProperties: true properties: apikey: title: "Sendgrid API key" diff --git a/airbyte-integrations/connectors/source-sendgrid/Dockerfile b/airbyte-integrations/connectors/source-sendgrid/Dockerfile index 80cb0bc4e57f5..b1459933c7f12 100644 --- a/airbyte-integrations/connectors/source-sendgrid/Dockerfile +++ b/airbyte-integrations/connectors/source-sendgrid/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.8 +LABEL io.airbyte.version=0.2.9 LABEL io.airbyte.name=airbyte/source-sendgrid diff --git a/airbyte-integrations/connectors/source-sendgrid/setup.py b/airbyte-integrations/connectors/source-sendgrid/setup.py index c5826e3411d85..1b84fed88cd98 100644 --- a/airbyte-integrations/connectors/source-sendgrid/setup.py +++ b/airbyte-integrations/connectors/source-sendgrid/setup.py @@ -11,6 +11,6 @@ author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=["airbyte-cdk~=0.1", "backoff", "requests", "pytest==6.1.2", "pytest-mock"], + install_requires=["airbyte-cdk>=0.1.74", "backoff", "requests", "pytest==6.1.2", "pytest-mock"], package_data={"": ["*.json", "schemas/*.json"]}, ) diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/sendgrid.yaml b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/sendgrid.yaml new file mode 100644 index 0000000000000..d03e3d9877018 --- /dev/null +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/sendgrid.yaml @@ -0,0 +1,256 @@ +definitions: + page_size: 50 + step: "30d" + + schema_loader: + type: JsonSchema + file_path: "./source_sendgrid/schemas/{{ options.name }}.json" + + requester: + type: HttpRequester + name: "{{ options['name'] }}" + url_base: "https://api.sendgrid.com" + http_method: "GET" + authenticator: + type: "BearerAuthenticator" + api_token: "{{ config.apikey }}" + cursor_paginator: + type: LimitPaginator + url_base: "*ref(definitions.requester.url_base)" + page_size: "*ref(definitions.page_size)" + limit_option: + inject_into: "request_parameter" + field_name: "page_size" + page_token_option: + inject_into: "path" + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + offset_paginator: + type: LimitPaginator + $options: + url_base: "*ref(definitions.requester.url_base)" + page_size: "*ref(definitions.page_size)" + limit_option: + inject_into: "request_parameter" + field_name: "limit" + page_token_option: + inject_into: "request_parameter" + field_name: "offset" + pagination_strategy: + type: "OffsetIncrement" + retriever: + type: SimpleRetriever + name: "{{ options['name'] }}" + primary_key: "{{ options['primary_key'] }}" + stream_slicer: + type: "DatetimeStreamSlicer" + start_datetime: + datetime: "{{ config['start_time'] }}" + datetime_format: "%s" + end_datetime: + datetime: "{{ now_utc() }}" + datetime_format: "%Y-%m-%d %H:%M:%S.%f%z" + step: "*ref(definitions.step)" + cursor_field: "{{ options.stream_cursor_field }}" + start_time_option: + field_name: "start_time" + inject_into: "request_parameter" + end_time_option: + field_name: "end_time" + inject_into: "request_parameter" + datetime_format: "%s" + messages_stream_slicer: + type: "DatetimeStreamSlicer" + start_datetime: + datetime: "{{ config['start_time'] }}" + datetime_format: "%s" + end_datetime: + datetime: "{{ now_utc() }}}" + datetime_format: "%Y-%m-%d %H:%M:%S.%f%z" + step: "*ref(definitions.step)" + cursor_field: "{{ options.stream_cursor_field }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + + base_stream: + type: DeclarativeStream + schema_loader: + $ref: "*ref(definitions.schema_loader)" + retriever: + $ref: "*ref(definitions.retriever)" + record_selector: + extractor: + field_pointer: [] + requester: + $ref: "*ref(definitions.requester)" + paginator: + type: NoPagination +streams: + - $ref: "*ref(definitions.base_stream)" + $options: + name: "lists" + primary_key: "id" + path: "/v3/marketing/lists" + field_pointer: ["result"] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.cursor_paginator)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "campaigns" + primary_key: "id" + path: "/v3/marketing/campaigns" + field_pointer: ["result"] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.cursor_paginator)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "contacts" + primary_key: "id" + path: "/v3/marketing/contacts" + field_pointer: ["result"] + - $ref: "*ref(definitions.base_stream)" + $options: + name: "stats_automations" + primary_key: "id" + path: "/v3/marketing/stats/automations" + field_pointer: ["results"] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.cursor_paginator)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "segments" + primary_key: "id" + path: "/v3/marketing/segments" + field_pointer: ["results"] + - $ref: "*ref(definitions.base_stream)" + $options: + name: "single_sends" + primary_key: "id" + path: "/v3/marketing/stats/singlesends" + field_pointer: ["results"] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.cursor_paginator)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "templates" + primary_key: "id" + path: "/v3/templates" + field_pointer: ["result"] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + requester: + $ref: "*ref(definitions.base_stream.retriever.requester)" + request_options_provider: + request_parameters: + generations: "legacy,dynamic" + paginator: + $ref: "*ref(definitions.cursor_paginator)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "bounces" + primary_key: "email" + stream_cursor_field: "created" + path: "/v3/suppression/bounces" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.offset_paginator)" + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "global_suppressions" + primary_key: "email" + stream_cursor_field: "created" + path: "/v3/suppression/unsubscribes" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.offset_paginator)" + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "blocks" + primary_key: "email" + stream_cursor_field: "created" + path: "/v3/suppression/blocks" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.offset_paginator)" + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "suppression_groups" + primary_key: "id" + path: "/v3/asm/groups" + field_pointer: [] + - $ref: "*ref(definitions.base_stream)" + $options: + name: "suppression_group_members" + primary_key: "group_id" + path: "/v3/asm/suppressions" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.offset_paginator)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "invalid_emails" + primary_key: "email" + stream_cursor_field: "created" + path: "/v3/suppression/invalid_emails" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.offset_paginator)" + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "spam_reports" + primary_key: "email" + stream_cursor_field: "created" + path: "/v3/suppression/spam_reports" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + paginator: + $ref: "*ref(definitions.offset_paginator)" + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" + - $ref: "*ref(definitions.base_stream)" + $options: + name: "messages" + primary_key: "msg_id" + stream_cursor_field: "last_event_time" + path: "/v3/messages" + field_pointer: [] + retriever: + $ref: "*ref(definitions.base_stream.retriever)" + requester: + $ref: "*ref(definitions.requester)" + request_options_provider: + request_parameters: + limit: 1000 + query: 'last_event_time BETWEEN TIMESTAMP "{{stream_slice.start_time}}" AND TIMESTAMP "{{stream_slice.end_time}}"' + stream_slicer: + $ref: "*ref(definitions.messages_stream_slicer)" +check: + type: CheckStream + stream_names: ["lists"] diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/source.py b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/source.py index 973c7d45ad0b7..925955ee4e187 100644 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/source.py +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/source.py @@ -2,63 +2,17 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -from typing import Any, List, Mapping, Tuple +""" +This file provides the necessary constructs to interpret a provided declarative YAML configuration file into +source connector. -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator +WARNING: Do not modify this file. +""" -from .streams import ( - Blocks, - Bounces, - Campaigns, - Contacts, - GlobalSuppressions, - InvalidEmails, - Lists, - Messages, - Scopes, - Segments, - SingleSends, - SpamReports, - StatsAutomations, - SuppressionGroupMembers, - SuppressionGroups, - Templates, -) - -class SourceSendgrid(AbstractSource): - def check_connection(self, logger, config) -> Tuple[bool, any]: - try: - authenticator = TokenAuthenticator(config["apikey"]) - scopes_gen = Scopes(authenticator=authenticator).read_records(sync_mode=SyncMode.full_refresh) - next(scopes_gen) - return True, None - except Exception as error: - return False, f"Unable to connect to Sendgrid API with the provided credentials - {error}" - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - authenticator = TokenAuthenticator(config["apikey"]) - - streams = [ - Lists(authenticator=authenticator), - Campaigns(authenticator=authenticator), - Contacts(authenticator=authenticator), - StatsAutomations(authenticator=authenticator), - Segments(authenticator=authenticator), - SingleSends(authenticator=authenticator), - Templates(authenticator=authenticator), - Messages(authenticator=authenticator, start_time=config["start_time"]), - GlobalSuppressions(authenticator=authenticator, start_time=config["start_time"]), - SuppressionGroups(authenticator=authenticator), - SuppressionGroupMembers(authenticator=authenticator), - Blocks(authenticator=authenticator, start_time=config["start_time"]), - Bounces(authenticator=authenticator, start_time=config["start_time"]), - InvalidEmails(authenticator=authenticator, start_time=config["start_time"]), - SpamReports(authenticator=authenticator, start_time=config["start_time"]), - ] - - return streams +# Declarative Source +class SourceSendgrid(YamlDeclarativeSource): + def __init__(self): + super().__init__(**{"path_to_yaml": "./source_sendgrid/sendgrid.yaml"}) diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/spec.json b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/spec.json index dfd5c3634e7ee..acaceb6b30bba 100644 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/spec.json +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/spec.json @@ -5,7 +5,7 @@ "title": "Sendgrid Spec", "type": "object", "required": ["apikey"], - "additionalProperties": false, + "additionalProperties": true, "properties": { "apikey": { "title": "Sendgrid API key", diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py deleted file mode 100644 index 542c0dafd21c7..0000000000000 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py +++ /dev/null @@ -1,273 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import datetime -import urllib -from abc import ABC, abstractmethod -from typing import Any, Iterable, Mapping, MutableMapping, Optional - -import pendulum -import requests -from airbyte_cdk.sources.streams.http import HttpStream - - -class SendgridStream(HttpStream, ABC): - url_base = "https://api.sendgrid.com/v3/" - primary_key = "id" - limit = 50 - data_field = None - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - pass - - def parse_response( - self, - response: requests.Response, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Iterable[Mapping]: - json_response = response.json() - records = json_response.get(self.data_field, []) if self.data_field is not None else json_response - - if records is not None: - for record in records: - yield record - else: - # TODO sendgrid's API is sending null responses at times. This seems like a bug on the API side, so we're adding - # log statements to help reproduce and prevent the connector from failing. - err_msg = ( - f"Response contained no valid JSON data. Response body: {response.text}\n" - f"Response status: {response.status_code}\n" - f"Response body: {response.text}\n" - f"Response headers: {response.headers}\n" - f"Request URL: {response.request.url}\n" - f"Request body: {response.request.body}\n" - ) - # do NOT print request headers as it contains auth token - self.logger.info(err_msg) - - -class SendgridStreamOffsetPagination(SendgridStream): - offset = 0 - - def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(next_page_token=next_page_token, **kwargs) - params["limit"] = self.limit - if next_page_token: - params.update(**next_page_token) - return params - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - stream_data = response.json() - if self.data_field: - stream_data = stream_data[self.data_field] - if len(stream_data) < self.limit: - return - self.offset += self.limit - return {"offset": self.offset} - - -class SendgridStreamIncrementalMixin(HttpStream, ABC): - cursor_field = "created" - - def __init__(self, start_time: int, **kwargs): - super().__init__(**kwargs) - self._start_time = start_time - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object - and returning an updated state object. - """ - latest_benchmark = latest_record[self.cursor_field] - if current_stream_state.get(self.cursor_field): - return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} - return {self.cursor_field: latest_benchmark} - - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(stream_state=stream_state) - start_time = self._start_time - if stream_state.get(self.cursor_field): - start_time = stream_state[self.cursor_field] - params.update({"start_time": start_time, "end_time": pendulum.now().int_timestamp}) - return params - - -class SendgridStreamMetadataPagination(SendgridStream): - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - params = {} - if not next_page_token: - params = {"page_size": self.limit} - return params - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - next_page_url = response.json()["_metadata"].get("next", False) - if next_page_url: - return {"next_page_url": next_page_url.replace(self.url_base, "")} - - @staticmethod - @abstractmethod - def initial_path() -> str: - """ - :return: initial path for the API endpoint if no next metadata url found - """ - - def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> str: - if next_page_token: - return next_page_token["next_page_url"] - return self.initial_path() - - -class Scopes(SendgridStream): - def path(self, **kwargs) -> str: - return "scopes" - - -class Lists(SendgridStreamMetadataPagination): - data_field = "result" - - @staticmethod - def initial_path() -> str: - return "marketing/lists" - - -class Campaigns(SendgridStreamMetadataPagination): - data_field = "result" - - @staticmethod - def initial_path() -> str: - return "marketing/campaigns" - - -class Contacts(SendgridStream): - data_field = "result" - - def path(self, **kwargs) -> str: - return "marketing/contacts" - - -class StatsAutomations(SendgridStreamMetadataPagination): - data_field = "results" - - @staticmethod - def initial_path() -> str: - return "marketing/stats/automations" - - -class Segments(SendgridStream): - data_field = "results" - - def path(self, **kwargs) -> str: - return "marketing/segments" - - -class SingleSends(SendgridStreamMetadataPagination): - """ - https://docs.sendgrid.com/api-reference/marketing-campaign-stats/get-all-single-sends-stats - """ - - data_field = "results" - - @staticmethod - def initial_path() -> str: - return "marketing/stats/singlesends" - - -class Templates(SendgridStreamMetadataPagination): - data_field = "result" - - def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(next_page_token=next_page_token, **kwargs) - params["generations"] = "legacy,dynamic" - return params - - @staticmethod - def initial_path() -> str: - return "templates" - - -class Messages(SendgridStream, SendgridStreamIncrementalMixin): - """ - https://docs.sendgrid.com/api-reference/e-mail-activity/filter-all-messages - """ - - data_field = "messages" - cursor_field = "last_event_time" - primary_key = "msg_id" - limit = 1000 - - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: - time_filter_template = "%Y-%m-%dT%H:%M:%SZ" - params = super().request_params(stream_state=stream_state, **kwargs) - if isinstance(params["start_time"], int): - date_start = datetime.datetime.fromtimestamp(params["start_time"]).strftime(time_filter_template) - else: - date_start = params["start_time"] - date_end = datetime.datetime.fromtimestamp(int(params["end_time"])).strftime(time_filter_template) - queryapi = f'last_event_time BETWEEN TIMESTAMP "{date_start}" AND TIMESTAMP "{date_end}"' - params["query"] = urllib.parse.quote(queryapi) - params["limit"] = self.limit - payload_str = "&".join("%s=%s" % (k, v) for k, v in params.items() if k not in ["start_time", "end_time"]) - return payload_str - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - return "messages" - - -class GlobalSuppressions(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin): - primary_key = "email" - - def path(self, **kwargs) -> str: - return "suppression/unsubscribes" - - -class SuppressionGroups(SendgridStream): - def path(self, **kwargs) -> str: - return "asm/groups" - - -class SuppressionGroupMembers(SendgridStreamOffsetPagination): - primary_key = "group_id" - - def path(self, **kwargs) -> str: - return "asm/suppressions" - - -class Blocks(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin): - primary_key = "email" - - def path(self, **kwargs) -> str: - return "suppression/blocks" - - -class Bounces(SendgridStream, SendgridStreamIncrementalMixin): - primary_key = "email" - - def path(self, **kwargs) -> str: - return "suppression/bounces" - - -class InvalidEmails(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin): - primary_key = "email" - - def path(self, **kwargs) -> str: - return "suppression/invalid_emails" - - -class SpamReports(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin): - primary_key = "email" - - def path(self, **kwargs) -> str: - return "suppression/spam_reports" diff --git a/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py index c5f8e71ff340b..3173b3aea5e8f 100644 --- a/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py @@ -2,27 +2,18 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json import unittest -from unittest.mock import MagicMock import pendulum import pytest import requests from airbyte_cdk.logger import AirbyteLogger from source_sendgrid.source import SourceSendgrid -from source_sendgrid.streams import Messages, SendgridStream FAKE_NOW = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone("utc")) -@pytest.fixture(name="sendgrid_stream") -def sendgrid_stream_fixture(mocker) -> SendgridStream: - # Wipe the internal list of abstract methods to allow instantiating the abstract class without implementing its abstract methods - mocker.patch("source_sendgrid.streams.SendgridStream.__abstractmethods__", set()) - # Mypy yells at us because we're init'ing an abstract class - return SendgridStream() # type: ignore - - @pytest.fixture() def mock_pendulum_now(monkeypatch): pendulum_mock = unittest.mock.MagicMock(wraps=pendulum.now) @@ -30,11 +21,24 @@ def mock_pendulum_now(monkeypatch): monkeypatch.setattr(pendulum, "now", pendulum_mock) -def test_parse_response_gracefully_handles_nulls(mocker, sendgrid_stream: SendgridStream): +def get_stream(stream_name): + source = SourceSendgrid() + streams = source.streams({}) + + return [s for s in streams if s.name == stream_name][0] + + +def create_response(): response = requests.Response() - mocker.patch.object(response, "json", return_value=None) - mocker.patch.object(response, "request", return_value=MagicMock()) - assert [] == list(sendgrid_stream.parse_response(response)) + response_body = {} + response.status_code = 200 + response._content = json.dumps(response_body).encode("utf-8") + return response + + +def test_parse_response_gracefully_handles_nulls(): + response = create_response() + assert [] == list(get_stream("contacts").retriever.parse_response(response, stream_slice={}, stream_state={})) def test_source_wrong_credentials(): @@ -44,11 +48,13 @@ def test_source_wrong_credentials(): def test_messages_stream_request_params(mock_pendulum_now): - start_time = 1558359837 - stream = Messages(start_time) + stream = get_stream("messages") state = {"last_event_time": 1558359000} - request_params = stream.request_params(state) - assert ( - request_params - == "query=last_event_time%20BETWEEN%20TIMESTAMP%20%222019-05-20T06%3A30%3A00Z%22%20AND%20TIMESTAMP%20%222021-12-31T16%3A00%3A00Z%22&limit=1000" + expected_params = { + "query": 'last_event_time BETWEEN TIMESTAMP "2019-05-20T06:30:00Z" AND TIMESTAMP "2021-12-31T16:00:00Z"', + "limit": 1000, + } + request_params = stream.retriever.request_params( + stream_state=state, stream_slice={"start_time": "2019-05-20T06:30:00Z", "end_time": "2021-12-31T16:00:00Z"} ) + assert request_params == expected_params diff --git a/docs/integrations/sources/sendgrid.md b/docs/integrations/sources/sendgrid.md index 77a17ac1e4871..4fffdfaedc0e5 100644 --- a/docs/integrations/sources/sendgrid.md +++ b/docs/integrations/sources/sendgrid.md @@ -34,8 +34,8 @@ The Sendgrid connector should not run into Sendgrid API limitations under normal * Sendgrid Account * Sendgrid API Key with the following permissions: - * Read-only access to all resources - * Full access to marketing resources + * Read-only access to all resources + * Full access to marketing resources ### Setup guide @@ -45,9 +45,10 @@ We recommend creating a key specifically for Airbyte access. This will allow you To consume Messages resources requires to purchase an extra on Sendgrid. You can read more about this [here](https://docs.sendgrid.com/api-reference/e-mail-activity) -| Version | Date | Pull Request | Subject | -| :--- | :--- | :--- | :--- | -| 0.2.8 | 2022-06-07 | [13571](https://github.com/airbytehq/airbyte/pull/13571) | Add Message stream | -| 0.2.7 | 2021-09-08 | [5910](https://github.com/airbytehq/airbyte/pull/5910) | Add Single Sends Stats stream | -| 0.2.6 | 2021-07-19 | [4839](https://github.com/airbytehq/airbyte/pull/4839) | Gracefully handle malformed responses from the API | +| Version | Date | Pull Request | Subject | +|:--------| :--- |:---------------------------------------------------------|:---------------------------------------------------| +| 0.2.9 | 2022-06-07 | [15257](https://github.com/airbytehq/airbyte/pull/15257) | Migrate to config-based framework | +| 0.2.8 | 2022-06-07 | [13571](https://github.com/airbytehq/airbyte/pull/13571) | Add Message stream | +| 0.2.7 | 2021-09-08 | [5910](https://github.com/airbytehq/airbyte/pull/5910) | Add Single Sends Stats stream | +| 0.2.6 | 2021-07-19 | [4839](https://github.com/airbytehq/airbyte/pull/4839) | Gracefully handle malformed responses from the API |