Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved schema_generator tool #13518

Merged
merged 8 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/connector-development/cdk-python/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ Important note: any objects referenced via `$ref` should be placed in the `share

If you are implementing a connector to pull data from an API which publishes an [OpenAPI/Swagger spec](https://swagger.io/specification/), you can use a tool we've provided for generating JSON schemas from the OpenAPI definition file. Detailed information can be found [here](https://github.com/airbytehq/airbyte/tree/master/tools/openapi2jsonschema/).

### Generating schemas using the output of your connector's read command

We also provide a tool for generating schemas using a connector's `read` command output. Detailed information can be found [here](https://github.com/airbytehq/airbyte/tree/master/tools/schema_generator/).


## Dynamic schemas

If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org).
Expand Down
38 changes: 38 additions & 0 deletions tools/schema_generator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Schema Generator
Util for generating catalog schema from a connector `read` command output.

## Prerequisites

To use this tool you first need to:

- Define all your streams.
- Create schema files for each stream, containing only `{}` (valid json files). See [this doc section](https://docs.airbyte.com/connector-development/cdk-python/schemas#static-schemas) for instructions on how to name these files.
- Build you container docker image.

Going through all the steps above should enable you to run the `read` command of your connector using the docker image, which is the input for this tool.

## Usage

First install the tools in it's own virtual environment:

```bash
$ cd tools/schema_generator # assumes you are starting from the root of the Airbyte project.
$ python -m venv .venv # Create a virtual environment in the .venv directory
$ source .venv/bin/activate # enable the venv
$ pip install -r requirements.txt
```

To use a connectors `run` command we first need an AirbyteConfiguredCatalog:

```bash
$ cd ../../connectors/<your-connector> # you need to use the tool at the root folder of a connector
$ docker run --rm -v $(pwd)/secrets:/secrets airbyte/<your-connector-image-name>:dev discover --config /secrets/config.json | schema_generator --configure-catalog
```
This will created the file **configured_catalog.json** in the **integration_tests** folder in the current working directory.

Now you're all set to run the following command and generate your schemas:

```bash
$ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/<your-connector-image-name>:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json | schema_generator --infer-schemas
```
Which will create schema files for all streams and place them in the **schemas** folder in the current working directory.
Empty file.
1 change: 1 addition & 0 deletions tools/schema_generator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-e .
Empty file.
22 changes: 22 additions & 0 deletions tools/schema_generator/schema_generator/configure_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import json
import os

from airbyte_cdk.models import ConfiguredAirbyteCatalog, AirbyteMessage, ConfiguredAirbyteStream, DestinationSyncMode


def configure_catalog():
record = AirbyteMessage.parse_raw(input())
for stream in record.catalog.streams:
stream.json_schema = {}
streams = [
ConfiguredAirbyteStream(stream=stream, sync_mode=stream.supported_sync_modes[0], destination_sync_mode=DestinationSyncMode.append)
for stream in record.catalog.streams
]
configured_catalog = ConfiguredAirbyteCatalog(streams=streams)

default_folder = os.path.join(os.getcwd(), "integration_tests")
if not os.path.exists(default_folder):
os.mkdir(default_folder)
output_file_name = os.path.join(default_folder, "configured_catalog.json")
with open(output_file_name, "w") as outfile:
json.dump(json.loads(configured_catalog.json()), outfile, indent=2, sort_keys=True)
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class NoRequiredObj(Object):
This class has Object behaviour, but it does not generate "required[]" fields
every time it parses object. So we dont add unnecessary extra field.
"""

def to_schema(self):
schema = super(Object, self).to_schema()
schema["type"] = "object"
Expand All @@ -50,7 +51,7 @@ class NoRequiredSchemaBuilder(SchemaBuilder):
EXTRA_STRATEGIES = (NoRequiredObj,)


def main():
def infer_schemas():
default_folder = os.path.join(os.getcwd(), "schemas")
if not os.path.exists(default_folder):
os.mkdir(default_folder)
Expand All @@ -71,7 +72,3 @@ def main():
output_file_name = os.path.join(default_folder, stream_name + ".json")
with open(output_file_name, "w") as outfile:
json.dump(schema, outfile, indent=2, sort_keys=True)


if __name__ == "__main__":
main()
34 changes: 34 additions & 0 deletions tools/schema_generator/schema_generator/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import argparse
import sys

from .infer_schemas import infer_schemas
from .configure_catalog import configure_catalog


def main():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any unit tests for the tool?


parser = argparse.ArgumentParser(description="Airbyte Schema Generator")

if len(sys.argv) == 1:
parser.print_help()

parser.add_argument("--configure-catalog", action="store_true", help="Generate a Configured Catalog")
parser.add_argument("--infer-schemas", action="store_true", help="Infer Stream Schemas")

args, unknown_args = parser.parse_known_args()

if unknown_args:
print(f"Invalid arguments: {unknown_args}.")
parser.print_help()
elif args.configure_catalog:
configure_catalog()
elif args.infer_schemas:
infer_schemas()


if __name__ == "__main__":
sys.exit(main())
28 changes: 28 additions & 0 deletions tools/schema_generator/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte_cdk==0.1.60", "genson==1.2.2"]

TEST_REQUIREMENTS = ["pytest"]


setup(
version="0.1.0",
name="schema_generator",
description="Util to create catalog schemas for an Airbyte Connector.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
extras_require={
"tests": TEST_REQUIREMENTS,
},
python_requires=">=3.9",
entry_points={
"console_scripts": ["schema_generator = schema_generator.main:main"],
},
)
64 changes: 64 additions & 0 deletions tools/schema_generator/tests/test_schema_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import io
import sys
import tempfile
import os
import json

import pytest

from schema_generator.configure_catalog import configure_catalog
from schema_generator.infer_schemas import infer_schemas

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteCatalog,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)


def test_configure_catalog():
stream = AirbyteStream(name="stream", supported_sync_modes=[SyncMode.full_refresh], json_schema={})
catalog = AirbyteCatalog(streams=[stream])
catalog_message = AirbyteMessage(type=Type.CATALOG, catalog=catalog)
sys.stdin = io.StringIO(catalog_message.json())

expected_configured_catalog = ConfiguredAirbyteCatalog(
streams=[ConfiguredAirbyteStream(stream=stream, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append)]
)

expected_configured_catalog_json = json.loads(expected_configured_catalog.json())

with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
configure_catalog()
assert os.path.exists("integration_tests/configured_catalog.json")

with open("integration_tests/configured_catalog.json") as f:
configured_catalog_json = json.loads(f.read())
assert configured_catalog_json == expected_configured_catalog_json


def test_infer_schemas():
expected_schema = {
"$schema": "http://json-schema.org/schema#",
"properties": {"a": {"type": "integer"}, "b": {"type": "string"}},
"type": "object",
}

with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
record = {"a": 1, "b": "test"}
record_message = AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="stream", data=record, emitted_at=111)).json()
sys.stdin = io.StringIO(record_message)
infer_schemas()
assert os.path.exists("schemas/stream.json")

with open("schemas/stream.json") as f:
schema = json.loads(f.read())
assert schema == expected_schema
27 changes: 27 additions & 0 deletions tools/schema_generator/tests/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationSyncMode,
Status,
SyncMode,
Type,
)

dummy_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(name="mystream", json_schema={"type": "object"}),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
]
)

print(dummy_catalog.json())