diff --git a/airbyte-config/init/src/main/resources/icons/orb.svg b/airbyte-config/init/src/main/resources/icons/orb.svg new file mode 100644 index 0000000000000..4a516fc122282 --- /dev/null +++ b/airbyte-config/init/src/main/resources/icons/orb.svg @@ -0,0 +1,70 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 07ea8f70b25c7..2df672b68d173 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -526,6 +526,13 @@ documentationUrl: https://docs.airbyte.io/integrations/sources/oracle icon: oracle.svg sourceType: database +- name: Orb + sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc + dockerRepository: airbyte/source-orb + dockerImageTag: 0.1.1 + documentationUrl: https://docs.airbyte.io/integrations/sources/orb + icon: orb.svg + sourceType: api - sourceDefinitionId: 3490c201-5d95-4783-b600-eaf07a4c7787 name: Outreach dockerRepository: airbyte/source-outreach diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 8376bcc4e3702..2fc15ad1307e3 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5832,6 +5832,60 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-orb:0.1.1" + spec: + documentationUrl: "https://docs.withorb.com/" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Orb Spec" + type: "object" + required: + - "api_key" + additionalProperties: false + properties: + api_key: + type: "string" + title: "Orb API Key" + description: "Orb API Key, issued from the Orb admin console." + airbyte_secret: true + order: 1 + start_date: + type: "string" + title: "Start Date" + pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + description: "UTC date and time in the format 2022-03-01T00:00:00Z. Any\ + \ data with created_at before this data will not be synced." + examples: + - "2022-03-01T00:00:00Z" + order: 2 + lookback_window_days: + type: "integer" + title: "Lookback Window (in days)" + default: 0 + minimum: 0 + description: "When set to N, the connector will always refresh resources\ + \ created within the past N days. By default, updated objects that are\ + \ not newly created are not incrementally synced." + order: 3 + string_event_properties_keys: + type: "array" + items: + type: "string" + title: "Event properties keys (string values)" + description: "Property key names to extract from all events, in order to\ + \ enrich ledger entries corresponding to an event deduction." + order: 4 + numeric_event_properties_keys: + type: "array" + items: + type: "string" + title: "Event properties keys (numeric values)" + description: "Property key names to extract from all events, in order to\ + \ enrich ledger entries corresponding to an event deduction." + order: 5 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-outreach:0.1.1" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/outreach" diff --git a/airbyte-integrations/connectors/source-orb/Dockerfile b/airbyte-integrations/connectors/source-orb/Dockerfile index 8fb10ad511d6e..d673aafcd4784 100644 --- a/airbyte-integrations/connectors/source-orb/Dockerfile +++ b/airbyte-integrations/connectors/source-orb/Dockerfile @@ -34,5 +34,5 @@ COPY source_orb ./source_orb ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-orb diff --git a/airbyte-integrations/connectors/source-orb/bootstrap.md b/airbyte-integrations/connectors/source-orb/bootstrap.md index 2844f55500cba..ea3ad7fe42339 100644 --- a/airbyte-integrations/connectors/source-orb/bootstrap.md +++ b/airbyte-integrations/connectors/source-orb/bootstrap.md @@ -15,6 +15,12 @@ Since the Orb API does not allow querying objects based on `updated_at`, these i Orb's API uses cursor-based pagination, which is documented [here](https://docs.withorb.com/reference/pagination). +## Enriching Credit Ledger entries + +The connector configuration includes two properties: `numeric_event_properties_keys` and `string_event_properties_keys`. + +When a ledger entry has an `event_id` attached to it (e.g. an automated decrement), the connector will make a follow-up request to enrich those entries with event properties corresponding to the keys provided. The connector assumes (and generates schema) that property values corresponding to the keys listed in `numeric_event_properties_keys` are numeric, and the property values corresponding to the keys listed in `string_event_properties_keys` are string typed. + ## Authentication This connector authenticates against the Orb API with an API key that can be issued via the Orb Admin Console. diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/credits_ledger_entries.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/credits_ledger_entries.json index dcc142813b554..d101a7fae9baf 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/schemas/credits_ledger_entries.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/credits_ledger_entries.json @@ -25,16 +25,21 @@ "entry_type": { "type": "string" }, - "expiry_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "target_expiry_date": { + "new_block_expiry_date": { "type": ["null", "string"], "format": "date-time" }, "customer_id": { "type": "string" } - } + }, + "required": [ + "id", + "starting_balance", + "ending_balance", + "amount", + "created_at", + "customer_id", + "entry_type" + ] } diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/customers.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/customers.json index a90e880f1ae84..0718219859550 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/schemas/customers.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/customers.json @@ -73,5 +73,6 @@ } } } - } + }, + "required": ["id", "created_at"] } diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/plans.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/plans.json index c1d4da1eb648c..0fcc46f3742b6 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/schemas/plans.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/plans.json @@ -33,14 +33,7 @@ "type": "string" } } - }, - "start_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "end_date": { - "type": ["null", "string"], - "format": "date-time" } - } + }, + "required": ["id", "created_at"] } diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json index 03b241812196f..41283384e7cd1 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json @@ -26,5 +26,6 @@ "status": { "type": "string" } - } + }, + "required": ["id", "created_at", "status"] } diff --git a/airbyte-integrations/connectors/source-orb/source_orb/source.py b/airbyte-integrations/connectors/source-orb/source_orb/source.py index 1eae889423cff..09f71fbdf3eb7 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/source.py +++ b/airbyte-integrations/connectors/source-orb/source_orb/source.py @@ -192,9 +192,12 @@ class CreditsLedgerEntries(IncrementalOrbStream): API Docs: https://docs.withorb.com/reference/view-credits-ledger """ - def __init__(self, event_properties_keys: Optional[List[str]] = None, **kwargs): + def __init__( + self, string_event_properties_keys: Optional[List[str]] = None, numeric_event_properties_keys: Optional[List[str]] = None, **kwargs + ): super().__init__(**kwargs) - self.event_properties_keys = event_properties_keys + self.string_event_properties_keys = string_event_properties_keys + self.numeric_event_properties_keys = numeric_event_properties_keys def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ @@ -306,7 +309,8 @@ def modify_ledger_entry_schema(ledger_entry): modify_ledger_entry_schema(ledger_entry=ledger_entry) # Nothing to extract for each ledger entry - if not self.event_properties_keys: + merged_properties_keys = (self.string_event_properties_keys or []) + (self.numeric_event_properties_keys or []) + if not merged_properties_keys: return ledger_entries # The events endpoint is a `POST` endpoint which expects a list of @@ -329,7 +333,7 @@ def modify_ledger_entry_schema(ledger_entry): for serialized_event in paginated_events_response_body["data"]: event_id = serialized_event["id"] desired_properties_subset = { - key: value for key, value in serialized_event["properties"].items() if key in self.event_properties_keys + key: value for key, value in serialized_event["properties"].items() if key in merged_properties_keys } # This would imply that the endpoint returned an event that wasn't part of the filter @@ -359,17 +363,20 @@ def get_json_schema(self) -> Mapping[str, Any]: """ schema = super().get_json_schema() dynamic_event_properties_schema = {} - if self.event_properties_keys: - for property_key in self.event_properties_keys: - # Property values are assumed to have string type. + if self.string_event_properties_keys: + for property_key in self.string_event_properties_keys: dynamic_event_properties_schema[property_key] = {"type": "string"} + if self.numeric_event_properties_keys: + for property_key in self.numeric_event_properties_keys: + dynamic_event_properties_schema[property_key] = {"type": "number"} schema["properties"]["event"] = { "type": ["null", "object"], "properties": { - "event_id": {"type": "string"}, + "id": {"type": "string"}, "properties": {"type": ["null", "object"], "properties": dynamic_event_properties_schema}, }, + "required": ["id"], } return schema @@ -391,10 +398,23 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: except Exception as e: return False, e + def input_keys_mutually_exclusive( + self, string_event_properties_keys: Optional[List[str]] = None, numeric_event_properties_keys: Optional[List[str]] = None + ): + if string_event_properties_keys is None or numeric_event_properties_keys is None: + return True + else: + return len(set(string_event_properties_keys) & set(numeric_event_properties_keys)) == 0 + def streams(self, config: Mapping[str, Any]) -> List[Stream]: authenticator = TokenAuthenticator(token=config["api_key"]) lookback_window = config.get("lookback_window_days") - event_properties_keys = config.get("event_properties_keys") + string_event_properties_keys = config.get("string_event_properties_keys") + numeric_event_properties_keys = config.get("numeric_event_properties_keys") + + if not self.input_keys_mutually_exclusive(string_event_properties_keys, numeric_event_properties_keys): + raise ValueError("Supplied property keys for string and numeric valued property values must be mutually exclusive.") + start_date_str = config.get("start_date") start_date = pendulum.parse(start_date_str) if start_date_str else None return [ @@ -405,6 +425,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date, - event_properties_keys=event_properties_keys, + string_event_properties_keys=string_event_properties_keys, + numeric_event_properties_keys=numeric_event_properties_keys, ), ] diff --git a/airbyte-integrations/connectors/source-orb/source_orb/spec.json b/airbyte-integrations/connectors/source-orb/source_orb/spec.json index bb492f5d52f20..49deae0236461 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/spec.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/spec.json @@ -30,14 +30,23 @@ "description": "When set to N, the connector will always refresh resources created within the past N days. By default, updated objects that are not newly created are not incrementally synced.", "order": 3 }, - "event_properties_keys": { + "string_event_properties_keys": { "type": "array", "items": { "type": "string" }, - "title": "Event properties keys", - "description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction", + "title": "Event properties keys (string values)", + "description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction.", "order": 4 + }, + "numeric_event_properties_keys": { + "type": "array", + "items": { + "type": "string" + }, + "title": "Event properties keys (numeric values)", + "description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction.", + "order": 5 } } } diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py index 6566acc78235e..fa0a10e3c594d 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py @@ -172,7 +172,7 @@ def test_credits_ledger_entries_request_params(mocker, current_stream_state, cur @responses.activate def test_credits_ledger_entries_no_matching_events(mocker): - stream = CreditsLedgerEntries(event_properties_keys=["ping"]) + stream = CreditsLedgerEntries(string_event_properties_keys=["ping"]) ledger_entries = [{"event_id": "foo-event-id", "entry_type": "decrement"}, {"event_id": "bar-event-id", "entry_type": "decrement"}] mock_response = { "data": [ @@ -199,17 +199,23 @@ def test_credits_ledger_entries_no_matching_events(mocker): @pytest.mark.parametrize( - ("event_properties", "selected_property_keys", "resulting_properties"), + ("event_properties", "selected_string_property_keys", "selected_numeric_property_keys", "resulting_properties"), [ - ({}, ["event-property-foo"], {}), - ({"ping": "pong"}, ["ping"], {"ping": "pong"}), - ({"ping": "pong", "unnamed_property": "foo"}, ["ping"], {"ping": "pong"}), - ({"unnamed_property": "foo"}, ["ping"], {}), + ({}, ["event-property-foo"], [], {}), + ({"ping": "pong"}, ["ping"], [], {"ping": "pong"}), + ({"ping": "pong", "unnamed_property": "foo"}, ["ping"], [], {"ping": "pong"}), + ({"unnamed_property": "foo"}, ["ping"], [], {}), + ({"numeric_property": 1}, ["ping"], ["numeric_property"], {"numeric_property": 1}), + ({"ping": "pong", "numeric_property": 1}, ["ping"], ["numeric_property"], {"ping": "pong", "numeric_property": 1}), ], ) @responses.activate -def test_credits_ledger_entries_enriches_selected_property_keys(mocker, event_properties, selected_property_keys, resulting_properties): - stream = CreditsLedgerEntries(event_properties_keys=selected_property_keys) +def test_credits_ledger_entries_enriches_selected_property_keys( + mocker, event_properties, selected_string_property_keys, selected_numeric_property_keys, resulting_properties +): + stream = CreditsLedgerEntries( + string_event_properties_keys=selected_string_property_keys, numeric_event_properties_keys=selected_numeric_property_keys + ) original_entry_1 = {"entry_type": "increment"} ledger_entries = [{"event_id": "foo-event-id", "entry_type": "decrement"}, original_entry_1] mock_response = { diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py index c85ce76839d46..fb2196adbd614 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py @@ -61,7 +61,7 @@ def test_http_method(patch_base_class): @pytest.mark.parametrize("event_properties_keys", [["foo-property"], ["foo-property", "bar-property"], None]) def test_credit_ledger_entries_schema(patch_base_class, mocker, event_properties_keys): - stream = CreditsLedgerEntries(event_properties_keys=event_properties_keys) + stream = CreditsLedgerEntries(string_event_properties_keys=event_properties_keys) json_schema = stream.get_json_schema() assert "event" in json_schema["properties"] diff --git a/docs/integrations/sources/orb.md b/docs/integrations/sources/orb.md index 3e11be31ff7fd..79a1890024a13 100644 --- a/docs/integrations/sources/orb.md +++ b/docs/integrations/sources/orb.md @@ -51,6 +51,7 @@ an Orb Account and API Key. ## Changelog | Version | Date | Pull Request | Subject | +| 0.1.1 | 2022-03-03 | [10839](https://github.com/airbytehq/airbyte/pull/10839) | Support ledger entries with numeric properties + schema fixes | 0.1.0 | 2022-02-01 | | New Source: Orb | :--- | :--- | :--- | :--- |