Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low code connectors] replace file retrieval with pkgutil to fix getting schema files #15814

Merged
merged 5 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.79
- Fix yaml schema parsing when running from docker container

## 0.1.78
- Fix yaml config parsing when running from docker container

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import importlib
import inspect
import typing
import warnings
from dataclasses import fields
from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints

Expand Down Expand Up @@ -153,7 +154,12 @@ def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool
# concrete classes that implement the interface before generating the schema
class_copy = copy.deepcopy(class_)
DeclarativeComponentFactory._transform_interface_to_union(class_copy)
schema = class_copy.json_schema()

# dataclasses_jsonschema can throw warnings when a declarative component has a fields cannot be turned into a schema.
# Some builtin field types like Any or DateTime get flagged, but are not as critical to schema generation and validation
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
schema = class_copy.json_schema()

component_definition = {
**updated_kwargs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import pkgutil
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union

Expand Down Expand Up @@ -32,9 +33,41 @@ def __post_init__(self, options: Mapping[str, Any]):
self.file_path = InterpolatedString.create(self.file_path, options=options)

def get_json_schema(self) -> Mapping[str, Any]:
# todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory
# this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there
json_schema_path = self._get_json_filepath()
with open(json_schema_path, "r") as f:
return json.loads(f.read())
resource, schema_path = self.extract_resource_and_schema_path(json_schema_path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this solution of splitting the URL. As mentioned in the below comment, this is a slight hack for us to figure out what resource the schemas are mounted under. Because setup.py mounts them in the source_sentry resource for example, we need to figure out from the airbyte_cdk module what the original source name was. All existing configs already prefix the source name so we can extract it from the first part of the path.

An alternative that I thought about was adding a new field to the JsonSchema dataclass called resource. However, I didn't like that because it will in 100% of cases always end up being the same as the name of the connector. It's also a weak abstraction and exposes our underlying file parsing mechanism by breaking it into two parts.

raw_json_file = pkgutil.get_data(resource, schema_path)

if not raw_json_file:
raise IOError(f"Cannot find file {json_schema_path}")
try:
raw_schema = json.loads(raw_json_file)
except ValueError as err:
raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err
return raw_schema

def _get_json_filepath(self):
return self.file_path.eval(self.config)

@staticmethod
def extract_resource_and_schema_path(json_schema_path: str) -> (str, str):
"""
When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract
the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight
hack to identify the source name while we are in the airbyte_cdk module.
:param json_schema_path: The path to the schema JSON file
:return: Tuple of the resource name and the path to the schema file
"""
split_path = json_schema_path.split("/")

if split_path[0] == "" or split_path[0] == ".":
split_path = split_path[1:]

if len(split_path) == 0:
return "", ""

if len(split_path) == 1:
return "", split_path[0]

return split_path[0], "/".join(split_path[1:])
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.78",
version="0.1.79",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest
from airbyte_cdk.sources.declarative.schema import JsonSchema


@pytest.mark.parametrize(
"test_name, input_path, expected_resource, expected_path",
[
("path_prefixed_with_dot", "./source_example/schemas/lists.json", "source_example", "schemas/lists.json"),
("path_prefixed_with_slash", "/source_example/schemas/lists.json", "source_example", "schemas/lists.json"),
("path_starting_with_source", "source_example/schemas/lists.json", "source_example", "schemas/lists.json"),
("path_starting_missing_source", "schemas/lists.json", "schemas", "lists.json"),
("path_with_file_only", "lists.json", "", "lists.json"),
("empty_path_does_not_crash", "", "", ""),
("empty_path_with_slash_does_not_crash", "/", "", ""),
],
)
def test_extract_resource_and_schema_path(test_name, input_path, expected_resource, expected_path):
json_schema = JsonSchema(input_path, {}, {})
actual_resource, actual_path = json_schema.extract_resource_and_schema_path(input_path)

assert actual_resource == expected_resource
assert actual_path == expected_path