Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New destination: databend #19815

Closed
wants to merge 30 commits into from
Closed

🎉 New destination: databend #19815

wants to merge 30 commits into from

Conversation

hantmac
Copy link
Contributor

@hantmac hantmac commented Nov 27, 2022

What

  • Add new databend destination

How

Recommended reading order

  1. docs/integrations/destinations/databend.md
  2. spec.json
  3. destination_databend/destination.py
  4. destination_databend/client.py
  5. destination_databend/writer.py

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

Tests

Unit

image

image

Integration

image

@CLAassistant
Copy link

CLAassistant commented Nov 27, 2022

CLA assistant check
All committers have signed the CLA.

@hantmac hantmac changed the title Feat: Add databend destination 🎉 New destination: databend Nov 28, 2022
@hantmac hantmac requested a review from a team as a code owner December 2, 2022 07:29
@sajarin sajarin added bounty-XL Maintainer program: claimable extra large bounty PR and removed normalization labels Dec 5, 2022
@sajarin sajarin removed request for a team December 5, 2022 16:59
@itaseskii itaseskii self-assigned this Dec 5, 2022
Copy link
Contributor

@grishick grishick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantmac we recently removed NormalizationRunnerFactory and moved specification of normalization into destination_definitions.yaml. In order to add normalization support, you would need to add a section to destination_definitions.yaml for the new connector. It could look like following (note: generate a random UUID v4 for destinationDefinitionId):

name: Databend
  destinationDefinitionId: aa0cf6d4-fa7a-4d4a-9a42-6ddb5687d28a
  dockerRepository: airbyte/destination-databend
  dockerImageTag: 0.1.0
  documentationUrl: https://docs.airbyte.com/integrations/destinations/databend
  normalizationConfig:
    normalizationRepository: airbyte/normalization-databend
    normalizationTag: 0.2.25
    normalizationIntegrationType: databend
  supportsDbt: true
  releaseStage: alpha

@hantmac
Copy link
Contributor Author

hantmac commented Dec 20, 2022

@hantmac we recently removed NormalizationRunnerFactory and moved specification of normalization into destination_definitions.yaml. In order to add normalization support, you would need to add a section to destination_definitions.yaml for the new connector. It could look like following (note: generate a random UUID v4 for destinationDefinitionId):

name: Databend
  destinationDefinitionId: aa0cf6d4-fa7a-4d4a-9a42-6ddb5687d28a
  dockerRepository: airbyte/destination-databend
  dockerImageTag: 0.1.0
  documentationUrl: https://docs.airbyte.com/integrations/destinations/databend
  normalizationConfig:
    normalizationRepository: airbyte/normalization-databend
    normalizationTag: 0.2.25
    normalizationIntegrationType: databend
  supportsDbt: true
  releaseStage: alpha

Hi @grishick, thanks for your review. I have added a section to destination_definitions.yaml.

@hantmac hantmac requested a review from a team December 20, 2022 08:29
@hantmac hantmac requested a review from a team as a code owner December 20, 2022 08:29
@octavia-squidington-iv octavia-squidington-iv added area/platform issues related to the platform area/frontend Related to the Airbyte webapp labels Dec 20, 2022
@marcosmarxm
Copy link
Member

Hello 👋:skin-tone-2: and thank you for your contribution!

Airbyte has instituted a code freeze between 19 and 30 December, to make sure there are no disruptions during the holidays.
Because of this, reviewing and merging your contribution may take longer than usual.
We apologize for the delay, but we want everyone to have a quiet and happy holiday.

If you have any questions or need further clarification, please don't hesitate to ping via Slack.

@grishick
Copy link
Contributor

I will try to move this PR the last mile here: #20909

@grishick
Copy link
Contributor

grishick commented Dec 29, 2022

I got a Databend cloud account set up, I added the credentials to secrets/config.json and tried running integration tests as

./gradlew :airbyte-integrations:connectors:destination-databend:integrationTest

I got the following error:

         ============================= test session starts ==============================
         platform darwin -- Python 3.9.11, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /Users/gregsolovyev/git/airbyte/airbyte-integrations/connectors/destination-databend/.venv/bin/python
         cachedir: .pytest_cache
         rootdir: /Users/gregsolovyev/git/airbyte/airbyte-integrations/connectors/destination-databend/integration_tests, configfile: ../../../../pyproject.toml
         collecting ... collected 3 items
         
         integration_tests/integration_test.py::test_check_valid_config {"type": "DEBUG", "message": "http sql: DROP TABLE IF EXISTS test", "data": {}}
         {"type": "DEBUG", "message": "http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"POST /v1/query/ HTTP/1.1\" 200 616", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"GET /v1/query/8bd905bd-8eb8-4732-9ff2-f5d7aeb40225/final HTTP/1.1\" 200 568", "data": {}}
         {"type": "DEBUG", "message": "http sql: CREATE TABLE if not exists test (x Int32,y VARCHAR)", "data": {}}
         {"type": "DEBUG", "message": "http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"POST /v1/query/ HTTP/1.1\" 200 616", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"GET /v1/query/7b7cec26-7d75-42a3-82b0-87457e98610a/final HTTP/1.1\" 200 568", "data": {}}
         FAILED
         integration_tests/integration_test.py::test_check_invalid_config PASSED
         integration_tests/integration_test.py::test_write {"type": "LOG", "log": {"level": "INFO", "message": "Using the SQL writing strategy"}}
         {"type": "DEBUG", "message": "http sql: \n        CREATE TABLE IF NOT EXISTS _airbyte_raw_append_stream (\n            _airbyte_ab_id TEXT,\n            _airbyte_emitted_at TIMESTAMP,\n            _airbyte_data TEXT\n        )\n        ", "data": {}}
         {"type": "DEBUG", "message": "http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"POST /v1/query/ HTTP/1.1\" 200 615", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"GET /v1/query/c88c416b-1cd8-4874-88d3-fe49871f28e9/final HTTP/1.1\" 200 567", "data": {}}
         {"type": "DEBUG", "message": "http sql: DROP TABLE IF EXISTS _airbyte_raw_overwrite_stream", "data": {}}
         {"type": "DEBUG", "message": "http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"POST /v1/query/ HTTP/1.1\" 200 616", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"GET /v1/query/2f997911-fd0a-4a7d-bd22-9880153c2b63/final HTTP/1.1\" 200 568", "data": {}}
         {"type": "LOG", "log": {"level": "INFO", "message": "Stream overwrite_stream is wiped."}}
         {"type": "DEBUG", "message": "http sql: \n        CREATE TABLE IF NOT EXISTS _airbyte_raw_overwrite_stream (\n            _airbyte_ab_id TEXT,\n            _airbyte_emitted_at TIMESTAMP,\n            _airbyte_data TEXT\n        )\n        ", "data": {}}
         {"type": "DEBUG", "message": "http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"POST /v1/query/ HTTP/1.1\" 200 624", "data": {}}
         {"type": "DEBUG", "message": "Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443", "data": {}}
         {"type": "DEBUG", "message": "https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 \"GET /v1/query/368e3d1b-cfda-4a73-a3d1-cf0532aeedc9/final HTTP/1.1\" 200 576", "data": {}}
         FAILED
         
         =================================== FAILURES ===================================
         ___________________________ test_check_valid_config ____________________________
         
         config = {'database': 'default', 'host': 'tnibtyx08--default.ch.aws-us-east-2.default.databend.com', 'password': '******', 'port': 443, ...}
         
             def test_check_valid_config(config: Mapping):
                 outcome = DestinationDatabend().check(logging.getLogger('airbyte'), config)
         >       assert outcome.status == Status.SUCCEEDED
         E       AssertionError: assert <Status.FAILED: 'FAILED'> == <Status.SUCCEEDED: 'SUCCEEDED'>
         E         +<Status.FAILED: 'FAILED'>
         E         -<Status.SUCCEEDED: 'SUCCEEDED'>
         
         integration_tests/integration_test.py:66: AssertionError
         ------------------------------ Captured log call -------------------------------
         DEBUG    databend_py.log:connection.py:109 http sql: DROP TABLE IF EXISTS test
         DEBUG    databend_py.log:connection.py:113 http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "POST /v1/query/ HTTP/1.1" 200 616
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "GET /v1/query/8bd905bd-8eb8-4732-9ff2-f5d7aeb40225/final HTTP/1.1" 200 568
         DEBUG    databend_py.log:connection.py:109 http sql: CREATE TABLE if not exists test (x Int32,y VARCHAR)
         DEBUG    databend_py.log:connection.py:113 http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "POST /v1/query/ HTTP/1.1" 200 616
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "GET /v1/query/7b7cec26-7d75-42a3-82b0-87457e98610a/final HTTP/1.1" 200 568
         __________________________________ test_write __________________________________
         
         config = {'database': 'default', 'host': 'tnibtyx08--default.ch.aws-us-east-2.default.databend.com', 'password': '*****', 'port': 443, ...}
         configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='append_stream', json_schema={'typ...cremental'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)])
         client = <destination_databend.client.DatabendClient object at 0x104958d30>
         
             def test_write(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog, client: DatabendClient):
                 """
                 This test verifies that:
                     1. writing a stream in "overwrite" mode overwrites any existing data for that stream
                     2. writing a stream in "append" mode appends new records without deleting the old ones
                     3. The correct state message is output by the connector at the end of the sync
                 """
                 append_stream, overwrite_stream = configured_catalog.streams[0].stream.name, configured_catalog.streams[1].stream.name
                 first_state_message = _state({"state": "1"})
                 first_record_chunk = [_record(append_stream, str(i), i) for i in range(5)] + [_record(overwrite_stream, str(i), i) for i in range(5)]
             
                 second_state_message = _state({"state": "2"})
                 second_record_chunk = [_record(append_stream, str(i), i) for i in range(5, 10)] + [
                     _record(overwrite_stream, str(i), i) for i in range(5, 10)
                 ]
             
                 destination = DestinationDatabend()
             
                 expected_states = [first_state_message, second_state_message]
         >       output_states = list(
                     destination.write(
                         config, configured_catalog, [*first_record_chunk, first_state_message, *second_record_chunk, second_state_message]
                     )
                 )
         
         integration_tests/integration_test.py:125: 
         _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
         destination_databend/destination.py:66: in write
             writer.flush()
         destination_databend/writer.py:128: in flush
             self._flush()
         destination_databend/writer.py:117: in _flush
             cursor.execute(
         .venv/lib/python3.9/site-packages/databend_sqlalchemy/connector.py:176: in execute
             sql = operation % _escaper.escape_args(parameters)
         .venv/lib/python3.9/site-packages/databend_sqlalchemy/connector.py:33: in escape_args
             return tuple(self.escape_item(x) for x in parameters)
         .venv/lib/python3.9/site-packages/databend_sqlalchemy/connector.py:33: in <genexpr>
             return tuple(self.escape_item(x) for x in parameters)
         _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
         
         self = <databend_sqlalchemy.connector.ParamEscaper object at 0x10459a9d0>
         item = 'db7a9b64-fa0e-4b50-b032-c682f30a66fd'
         
             def escape_item(self, item):
                 if item is None:
                     return 'NULL'
                 elif isinstance(item, (int, float)):
                     return self.escape_number(item)
                 elif isinstance(item, datetime):
                     return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S"))
                 else:
         >           raise Exception("Unsupported object {}".format(item))
         E           Exception: Unsupported object db7a9b64-fa0e-4b50-b032-c682f30a66fd
         
         .venv/lib/python3.9/site-packages/databend_sqlalchemy/connector.py:57: Exception
         ------------------------------ Captured log call -------------------------------
         INFO     airbyte:writer.py:132 Using the SQL writing strategy
         DEBUG    databend_py.log:connection.py:109 http sql: 
                 CREATE TABLE IF NOT EXISTS _airbyte_raw_append_stream (
                     _airbyte_ab_id TEXT,
                     _airbyte_emitted_at TIMESTAMP,
                     _airbyte_data TEXT
                 )
                 
         DEBUG    databend_py.log:connection.py:113 http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "POST /v1/query/ HTTP/1.1" 200 615
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "GET /v1/query/c88c416b-1cd8-4874-88d3-fe49871f28e9/final HTTP/1.1" 200 567
         DEBUG    databend_py.log:connection.py:109 http sql: DROP TABLE IF EXISTS _airbyte_raw_overwrite_stream
         DEBUG    databend_py.log:connection.py:113 http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "POST /v1/query/ HTTP/1.1" 200 616
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "GET /v1/query/2f997911-fd0a-4a7d-bd22-9880153c2b63/final HTTP/1.1" 200 568
         INFO     airbyte:destination.py:50 Stream overwrite_stream is wiped.
         DEBUG    databend_py.log:connection.py:109 http sql: 
                 CREATE TABLE IF NOT EXISTS _airbyte_raw_overwrite_stream (
                     _airbyte_ab_id TEXT,
                     _airbyte_emitted_at TIMESTAMP,
                     _airbyte_data TEXT
                 )
                 
         DEBUG    databend_py.log:connection.py:113 http headers {'Content-Type': 'application/json', 'Accept': 'application/json', 'X-DATABEND-ROUTE': 'warehouse', 'Authorization': 'Basic Y2xvdWRhcHA6Y2pndmR4NWVlNXhl'}
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "POST /v1/query/ HTTP/1.1" 200 624
         DEBUG    urllib3.connectionpool:connectionpool.py:1003 Starting new HTTPS connection (1): tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443
         DEBUG    urllib3.connectionpool:connectionpool.py:456 https://tnibtyx08--default.ch.aws-us-east-2.default.databend.com:443 "GET /v1/query/368e3d1b-cfda-4a73-a3d1-cf0532aeedc9/final HTTP/1.1" 200 576
         =========================== short test summary info ============================
         FAILED integration_tests/integration_test.py::test_check_valid_config - Asser...
         FAILED integration_tests/integration_test.py::test_write - Exception: Unsuppo...
         ========================= 2 failed, 1 passed in 5.46s ==========================

> Task :airbyte-integrations:connectors:destination-databend:_customIntegrationTestsCoverage FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':airbyte-integrations:connectors:destination-databend:_customIntegrationTestsCoverage'.
> Python call failed: .venv/bin/python -m coverage run --data-file=integration_tests/.coverage.customIntegrationTests --rcfile=/Users/gregsolovyev/git/airbyte/pyproject.toml -m pytest -s integration_tests -c /Users/gregsolovyev/git/airbyte/pyproject.toml

@grishick
Copy link
Contributor

grishick commented Dec 29, 2022

It appears that ParamEscaper::escape_item is unable to process a string parameter that looks like UUID:

    def escape_item(self, item):
        if item is None:
            return 'NULL'
        elif isinstance(item, (int, float)):
            return self.escape_number(item)
        elif isinstance(item, datetime):
            return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S"))
        else:
            raise Exception("Unsupported object {}".format(item))

@hantmac
Copy link
Contributor Author

hantmac commented Dec 30, 2022

It appears that ParamEscaper::escape_item is unable to process a string parameter that looks like UUID:

    def escape_item(self, item):
        if item is None:
            return 'NULL'
        elif isinstance(item, (int, float)):
            return self.escape_number(item)
        elif isinstance(item, datetime):
            return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S"))
        else:
            raise Exception("Unsupported object {}".format(item))

Thanks for your help. The databend_sqlalchemy has bug and have been fixed. After that the integration tests passed.

collected 3 items                                                                                                                                                                                             

integration_tests/integration_test.py::test_check_valid_config 
PASSED
integration_tests/integration_test.py::test_check_invalid_config PASSED
integration_tests/integration_test.py::test_write {"type": "LOG", "log": {"level": "INFO", "message": "Using the SQL writing strategy"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Stream overwrite_stream is wiped."}}
{"type": "LOG", "log": {"level": "INFO", "message": "Using the SQL writing strategy"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Stream overwrite_stream is wiped."}}
PASSED

@grishick
Copy link
Contributor

grishick commented Jan 4, 2023

I added secrets to GSM and am running integration tests here: #20909

@grishick
Copy link
Contributor

grishick commented Jan 4, 2023

@hantmac I added secrets to secrets manager and ran integration tests via CI. Destination tests pass (🎉 ), but normalization tests fail: https://github.com/airbytehq/airbyte/actions/runs/3839797452/jobs/6538073823

grishick added a commit that referenced this pull request Jan 9, 2023
* feat: Add databend destination

Co-authored-by: hantmac <hantmac@outlook.com>
Co-authored-by: josephkmh <joseph@airbyte.io>
Co-authored-by: Sajarin <sajarindider@gmail.com>
@grishick
Copy link
Contributor

grishick commented Jan 9, 2023

This contribution has been merged in this PR: #20909 w/o normalization support
@hantmac feel free to followup with adding normalization support

@grishick grishick closed this Jan 9, 2023
jbfbell pushed a commit that referenced this pull request Jan 13, 2023
* feat: Add databend destination

Co-authored-by: hantmac <hantmac@outlook.com>
Co-authored-by: josephkmh <joseph@airbyte.io>
Co-authored-by: Sajarin <sajarindider@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation area/frontend Related to the Airbyte webapp area/platform issues related to the platform bounty bounty-XL Maintainer program: claimable extra large bounty PR community connectors/destination/databend connectors/source/elasticsearch connectors/source/mysql normalization
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

8 participants