diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/read_exception.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/exceptions.py similarity index 58% rename from airbyte-cdk/python/airbyte_cdk/sources/declarative/read_exception.py rename to airbyte-cdk/python/airbyte_cdk/sources/declarative/exceptions.py index 160cdcb43f0cb..185a833b0defe 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/read_exception.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/exceptions.py @@ -7,3 +7,9 @@ class ReadException(Exception): """ Raise when there is an error reading data from an API Source """ + + +class InvalidConnectorDefinitionException(Exception): + """ + Raise when the connector definition is invalid + """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 8443026161d51..d019c87b95c54 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -7,8 +7,8 @@ import requests from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.exceptions import ReadException from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector -from airbyte_cdk.sources.declarative.read_exception import ReadException from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index bebecdfa2e2a3..f07537d2ed325 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -8,6 +8,7 @@ from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource +from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser from airbyte_cdk.sources.streams import Stream @@ -16,6 +17,8 @@ class YamlDeclarativeSource(DeclarativeSource): """Declarative source defined by a yaml file""" + VALID_TOP_LEVEL_FIELDS = {"definitions", "streams", "check", "version"} + def __init__(self, path_to_yaml): """ :param path_to_yaml: Path to the yaml file describing the source @@ -25,6 +28,11 @@ def __init__(self, path_to_yaml): self._path_to_yaml = path_to_yaml self._source_config = self._read_and_parse_yaml_file(path_to_yaml) + # Stopgap to protect the top-level namespace until it's validated through the schema + unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS] + if unknown_fields: + raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}") + @property def connection_checker(self) -> ConnectionChecker: check = self._source_config["check"] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 6639aa6ec8073..064bef0f8786b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -8,7 +8,7 @@ import pytest import requests from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.read_exception import ReadException +from airbyte_cdk.sources.declarative.exceptions import ReadException from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py new file mode 100644 index 0000000000000..91e3f710cb098 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py @@ -0,0 +1,50 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import os +import tempfile +import unittest + +from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource + + +class TestYamlDeclarativeSource(unittest.TestCase): + def test_source_is_created_if_toplevel_fields_are_known(self): + content = """ + version: "version" + streams: "streams" + check: "check" + """ + temporary_file = TestFileContent(content) + YamlDeclarativeSource(temporary_file.filename) + + def test_source_is_not_created_if_toplevel_fields_are_unknown(self): + content = """ + version: "version" + streams: "streams" + check: "check" + not_a_valid_field: "error" + """ + temporary_file = TestFileContent(content) + with self.assertRaises(InvalidConnectorDefinitionException): + YamlDeclarativeSource(temporary_file.filename) + + +class TestFileContent: + def __init__(self, content): + self.file = tempfile.NamedTemporaryFile(mode="w", delete=False) + + with self.file as f: + f.write(content) + + @property + def filename(self): + return self.file.name + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + os.unlink(self.filename) diff --git a/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs b/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs index 6f8433d720263..6df2d2b76ffba 100644 --- a/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs +++ b/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs @@ -1,42 +1,44 @@ -schema_loader: - type: JsonSchema - file_path: "./source_{{snakeCase name}}/schemas/\{{ options['name'] }}.json" -selector: - type: RecordSelector - extractor: - type: JelloExtractor - transform: "_" -requester: - type: HttpRequester - name: "\{{ options['name'] }}" - http_method: "GET" - authenticator: - type: BearerAuthenticator - api_token: "\{{ config['api_key'] }}" -retriever: - type: SimpleRetriever - $options: - url_base: TODO "your_api_base_url" - name: "\{{ options['name'] }}" - primary_key: "\{{ options['primary_key'] }}" - record_selector: - $ref: "*ref(selector)" - paginator: - type: NoPagination -customers_stream: - type: DeclarativeStream - $options: - name: "customers" - primary_key: "id" +version: "0.1.0" + +definitions: schema_loader: - $ref: "*ref(schema_loader)" + type: JsonSchema + file_path: "./source_{{snakeCase name}}/schemas/\{{ options['name'] }}.json" + selector: + type: RecordSelector + extractor: + type: JelloExtractor + transform: "_" + requester: + type: HttpRequester + name: "\{{ options['name'] }}" + http_method: "GET" + authenticator: + type: BearerAuthenticator + api_token: "\{{ config['api_key'] }}" retriever: - $ref: "*ref(retriever)" - requester: - $ref: "*ref(requester)" - path: TODO "your_endpoint_path" + type: SimpleRetriever + $options: + url_base: TODO "your_api_base_url" + name: "\{{ options['name'] }}" + primary_key: "\{{ options['primary_key'] }}" + record_selector: + $ref: "*ref(definitions.selector)" + paginator: + type: NoPagination + streams: - - "*ref(customers_stream)" + - type: DeclarativeStream + $options: + name: "customers" + primary_key: "id" + schema_loader: + $ref: "*ref(definitions.schema_loader)" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: TODO "your_endpoint_path" check: type: CheckStream stream_names: ["customers"]