Skip to content

Commit

Permalink
cdk: Add a schema_generator tool (#13518)
Browse files Browse the repository at this point in the history
  • Loading branch information
dougpm authored Jul 13, 2022
1 parent 32e767c commit bfc2646
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/connector-development/cdk-python/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ Important note: any objects referenced via `$ref` should be placed in the `share

If you are implementing a connector to pull data from an API which publishes an [OpenAPI/Swagger spec](https://swagger.io/specification/), you can use a tool we've provided for generating JSON schemas from the OpenAPI definition file. Detailed information can be found [here](https://github.com/airbytehq/airbyte/tree/master/tools/openapi2jsonschema/).

### Generating schemas using the output of your connector's read command

We also provide a tool for generating schemas using a connector's `read` command output. Detailed information can be found [here](https://github.com/airbytehq/airbyte/tree/master/tools/schema_generator/).


## Dynamic schemas

If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org).
Expand Down
38 changes: 38 additions & 0 deletions tools/schema_generator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Schema Generator
Util for generating catalog schema from a connector `read` command output.

## Prerequisites

To use this tool you first need to:

- Define all your streams.
- Create schema files for each stream, containing only `{}` (valid json files). See [this doc section](https://docs.airbyte.com/connector-development/cdk-python/schemas#static-schemas) for instructions on how to name these files.
- Build you container docker image.

Going through all the steps above should enable you to run the `read` command of your connector using the docker image, which is the input for this tool.

## Usage

First install the tools in it's own virtual environment:

```bash
$ cd tools/schema_generator # assumes you are starting from the root of the Airbyte project.
$ python -m venv .venv # Create a virtual environment in the .venv directory
$ source .venv/bin/activate # enable the venv
$ pip install -r requirements.txt
```

To use a connectors `run` command we first need an AirbyteConfiguredCatalog:

```bash
$ cd ../../connectors/<your-connector> # you need to use the tool at the root folder of a connector
$ docker run --rm -v $(pwd)/secrets:/secrets airbyte/<your-connector-image-name>:dev discover --config /secrets/config.json | schema_generator --configure-catalog
```
This will created the file **configured_catalog.json** in the **integration_tests** folder in the current working directory.

Now you're all set to run the following command and generate your schemas:

```bash
$ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/<your-connector-image-name>:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json | schema_generator --infer-schemas
```
Which will create schema files for all streams and place them in the **schemas** folder in the current working directory.
Empty file.
1 change: 1 addition & 0 deletions tools/schema_generator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-e .
Empty file.
22 changes: 22 additions & 0 deletions tools/schema_generator/schema_generator/configure_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import json
import os

from airbyte_cdk.models import ConfiguredAirbyteCatalog, AirbyteMessage, ConfiguredAirbyteStream, DestinationSyncMode


def configure_catalog():
record = AirbyteMessage.parse_raw(input())
for stream in record.catalog.streams:
stream.json_schema = {}
streams = [
ConfiguredAirbyteStream(stream=stream, sync_mode=stream.supported_sync_modes[0], destination_sync_mode=DestinationSyncMode.append)
for stream in record.catalog.streams
]
configured_catalog = ConfiguredAirbyteCatalog(streams=streams)

default_folder = os.path.join(os.getcwd(), "integration_tests")
if not os.path.exists(default_folder):
os.mkdir(default_folder)
output_file_name = os.path.join(default_folder, "configured_catalog.json")
with open(output_file_name, "w") as outfile:
json.dump(json.loads(configured_catalog.json()), outfile, indent=2, sort_keys=True)
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class NoRequiredObj(Object):
This class has Object behaviour, but it does not generate "required[]" fields
every time it parses object. So we dont add unnecessary extra field.
"""

def to_schema(self):
schema = super(Object, self).to_schema()
schema["type"] = "object"
Expand All @@ -50,7 +51,7 @@ class NoRequiredSchemaBuilder(SchemaBuilder):
EXTRA_STRATEGIES = (NoRequiredObj,)


def main():
def infer_schemas():
default_folder = os.path.join(os.getcwd(), "schemas")
if not os.path.exists(default_folder):
os.mkdir(default_folder)
Expand All @@ -71,7 +72,3 @@ def main():
output_file_name = os.path.join(default_folder, stream_name + ".json")
with open(output_file_name, "w") as outfile:
json.dump(schema, outfile, indent=2, sort_keys=True)


if __name__ == "__main__":
main()
34 changes: 34 additions & 0 deletions tools/schema_generator/schema_generator/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import argparse
import sys

from .infer_schemas import infer_schemas
from .configure_catalog import configure_catalog


def main():

parser = argparse.ArgumentParser(description="Airbyte Schema Generator")

if len(sys.argv) == 1:
parser.print_help()

parser.add_argument("--configure-catalog", action="store_true", help="Generate a Configured Catalog")
parser.add_argument("--infer-schemas", action="store_true", help="Infer Stream Schemas")

args, unknown_args = parser.parse_known_args()

if unknown_args:
print(f"Invalid arguments: {unknown_args}.")
parser.print_help()
elif args.configure_catalog:
configure_catalog()
elif args.infer_schemas:
infer_schemas()


if __name__ == "__main__":
sys.exit(main())
28 changes: 28 additions & 0 deletions tools/schema_generator/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte_cdk==0.1.60", "genson==1.2.2"]

TEST_REQUIREMENTS = ["pytest"]


setup(
version="0.1.0",
name="schema_generator",
description="Util to create catalog schemas for an Airbyte Connector.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
extras_require={
"tests": TEST_REQUIREMENTS,
},
python_requires=">=3.9",
entry_points={
"console_scripts": ["schema_generator = schema_generator.main:main"],
},
)
64 changes: 64 additions & 0 deletions tools/schema_generator/tests/test_schema_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import io
import sys
import tempfile
import os
import json

import pytest

from schema_generator.configure_catalog import configure_catalog
from schema_generator.infer_schemas import infer_schemas

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteCatalog,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)


def test_configure_catalog():
stream = AirbyteStream(name="stream", supported_sync_modes=[SyncMode.full_refresh], json_schema={})
catalog = AirbyteCatalog(streams=[stream])
catalog_message = AirbyteMessage(type=Type.CATALOG, catalog=catalog)
sys.stdin = io.StringIO(catalog_message.json())

expected_configured_catalog = ConfiguredAirbyteCatalog(
streams=[ConfiguredAirbyteStream(stream=stream, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append)]
)

expected_configured_catalog_json = json.loads(expected_configured_catalog.json())

with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
configure_catalog()
assert os.path.exists("integration_tests/configured_catalog.json")

with open("integration_tests/configured_catalog.json") as f:
configured_catalog_json = json.loads(f.read())
assert configured_catalog_json == expected_configured_catalog_json


def test_infer_schemas():
expected_schema = {
"$schema": "http://json-schema.org/schema#",
"properties": {"a": {"type": "integer"}, "b": {"type": "string"}},
"type": "object",
}

with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
record = {"a": 1, "b": "test"}
record_message = AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="stream", data=record, emitted_at=111)).json()
sys.stdin = io.StringIO(record_message)
infer_schemas()
assert os.path.exists("schemas/stream.json")

with open("schemas/stream.json") as f:
schema = json.loads(f.read())
assert schema == expected_schema
27 changes: 27 additions & 0 deletions tools/schema_generator/tests/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationSyncMode,
Status,
SyncMode,
Type,
)

dummy_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(name="mystream", json_schema={"type": "object"}),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
]
)

print(dummy_catalog.json())

0 comments on commit bfc2646

Please sign in to comment.