Skip to content

Commit

Permalink
🎉 Source Orb: support enriching ledger entries with numeric properties (
Browse files Browse the repository at this point in the history
#10839)

* Modifications to Orb schema

* Docs updates

* docs updates

* Remove deprecated field from connector export

* Add to source_definitions and update Dockerfile

* Update docs + run airbyte-config:init:processResources
  • Loading branch information
kgrover authored Mar 10, 2022
1 parent c720120 commit 01ed0e7
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 40 deletions.
70 changes: 70 additions & 0 deletions airbyte-config/init/src/main/resources/icons/orb.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@
documentationUrl: https://docs.airbyte.io/integrations/sources/oracle
icon: oracle.svg
sourceType: database
- name: Orb
sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc
dockerRepository: airbyte/source-orb
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/orb
icon: orb.svg
sourceType: api
- sourceDefinitionId: 3490c201-5d95-4783-b600-eaf07a4c7787
name: Outreach
dockerRepository: airbyte/source-outreach
Expand Down
54 changes: 54 additions & 0 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5832,6 +5832,60 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-orb:0.1.1"
spec:
documentationUrl: "https://docs.withorb.com/"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Orb Spec"
type: "object"
required:
- "api_key"
additionalProperties: false
properties:
api_key:
type: "string"
title: "Orb API Key"
description: "Orb API Key, issued from the Orb admin console."
airbyte_secret: true
order: 1
start_date:
type: "string"
title: "Start Date"
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
description: "UTC date and time in the format 2022-03-01T00:00:00Z. Any\
\ data with created_at before this data will not be synced."
examples:
- "2022-03-01T00:00:00Z"
order: 2
lookback_window_days:
type: "integer"
title: "Lookback Window (in days)"
default: 0
minimum: 0
description: "When set to N, the connector will always refresh resources\
\ created within the past N days. By default, updated objects that are\
\ not newly created are not incrementally synced."
order: 3
string_event_properties_keys:
type: "array"
items:
type: "string"
title: "Event properties keys (string values)"
description: "Property key names to extract from all events, in order to\
\ enrich ledger entries corresponding to an event deduction."
order: 4
numeric_event_properties_keys:
type: "array"
items:
type: "string"
title: "Event properties keys (numeric values)"
description: "Property key names to extract from all events, in order to\
\ enrich ledger entries corresponding to an event deduction."
order: 5
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-outreach:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/outreach"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-orb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_orb ./source_orb
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-orb
6 changes: 6 additions & 0 deletions airbyte-integrations/connectors/source-orb/bootstrap.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ Since the Orb API does not allow querying objects based on `updated_at`, these i

Orb's API uses cursor-based pagination, which is documented [here](https://docs.withorb.com/reference/pagination).

## Enriching Credit Ledger entries

The connector configuration includes two properties: `numeric_event_properties_keys` and `string_event_properties_keys`.

When a ledger entry has an `event_id` attached to it (e.g. an automated decrement), the connector will make a follow-up request to enrich those entries with event properties corresponding to the keys provided. The connector assumes (and generates schema) that property values corresponding to the keys listed in `numeric_event_properties_keys` are numeric, and the property values corresponding to the keys listed in `string_event_properties_keys` are string typed.

## Authentication

This connector authenticates against the Orb API with an API key that can be issued via the Orb Admin Console.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
"entry_type": {
"type": "string"
},
"expiry_date": {
"type": ["null", "string"],
"format": "date-time"
},
"target_expiry_date": {
"new_block_expiry_date": {
"type": ["null", "string"],
"format": "date-time"
},
"customer_id": {
"type": "string"
}
}
},
"required": [
"id",
"starting_balance",
"ending_balance",
"amount",
"created_at",
"customer_id",
"entry_type"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@
}
}
}
}
},
"required": ["id", "created_at"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,7 @@
"type": "string"
}
}
},
"start_date": {
"type": ["null", "string"],
"format": "date-time"
},
"end_date": {
"type": ["null", "string"],
"format": "date-time"
}
}
},
"required": ["id", "created_at"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"status": {
"type": "string"
}
}
},
"required": ["id", "created_at", "status"]
}
41 changes: 31 additions & 10 deletions airbyte-integrations/connectors/source-orb/source_orb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,12 @@ class CreditsLedgerEntries(IncrementalOrbStream):
API Docs: https://docs.withorb.com/reference/view-credits-ledger
"""

def __init__(self, event_properties_keys: Optional[List[str]] = None, **kwargs):
def __init__(
self, string_event_properties_keys: Optional[List[str]] = None, numeric_event_properties_keys: Optional[List[str]] = None, **kwargs
):
super().__init__(**kwargs)
self.event_properties_keys = event_properties_keys
self.string_event_properties_keys = string_event_properties_keys
self.numeric_event_properties_keys = numeric_event_properties_keys

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Expand Down Expand Up @@ -306,7 +309,8 @@ def modify_ledger_entry_schema(ledger_entry):
modify_ledger_entry_schema(ledger_entry=ledger_entry)

# Nothing to extract for each ledger entry
if not self.event_properties_keys:
merged_properties_keys = (self.string_event_properties_keys or []) + (self.numeric_event_properties_keys or [])
if not merged_properties_keys:
return ledger_entries

# The events endpoint is a `POST` endpoint which expects a list of
Expand All @@ -329,7 +333,7 @@ def modify_ledger_entry_schema(ledger_entry):
for serialized_event in paginated_events_response_body["data"]:
event_id = serialized_event["id"]
desired_properties_subset = {
key: value for key, value in serialized_event["properties"].items() if key in self.event_properties_keys
key: value for key, value in serialized_event["properties"].items() if key in merged_properties_keys
}

# This would imply that the endpoint returned an event that wasn't part of the filter
Expand Down Expand Up @@ -359,17 +363,20 @@ def get_json_schema(self) -> Mapping[str, Any]:
"""
schema = super().get_json_schema()
dynamic_event_properties_schema = {}
if self.event_properties_keys:
for property_key in self.event_properties_keys:
# Property values are assumed to have string type.
if self.string_event_properties_keys:
for property_key in self.string_event_properties_keys:
dynamic_event_properties_schema[property_key] = {"type": "string"}
if self.numeric_event_properties_keys:
for property_key in self.numeric_event_properties_keys:
dynamic_event_properties_schema[property_key] = {"type": "number"}

schema["properties"]["event"] = {
"type": ["null", "object"],
"properties": {
"event_id": {"type": "string"},
"id": {"type": "string"},
"properties": {"type": ["null", "object"], "properties": dynamic_event_properties_schema},
},
"required": ["id"],
}

return schema
Expand All @@ -391,10 +398,23 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
except Exception as e:
return False, e

def input_keys_mutually_exclusive(
self, string_event_properties_keys: Optional[List[str]] = None, numeric_event_properties_keys: Optional[List[str]] = None
):
if string_event_properties_keys is None or numeric_event_properties_keys is None:
return True
else:
return len(set(string_event_properties_keys) & set(numeric_event_properties_keys)) == 0

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["api_key"])
lookback_window = config.get("lookback_window_days")
event_properties_keys = config.get("event_properties_keys")
string_event_properties_keys = config.get("string_event_properties_keys")
numeric_event_properties_keys = config.get("numeric_event_properties_keys")

if not self.input_keys_mutually_exclusive(string_event_properties_keys, numeric_event_properties_keys):
raise ValueError("Supplied property keys for string and numeric valued property values must be mutually exclusive.")

start_date_str = config.get("start_date")
start_date = pendulum.parse(start_date_str) if start_date_str else None
return [
Expand All @@ -405,6 +425,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator=authenticator,
lookback_window_days=lookback_window,
start_date=start_date,
event_properties_keys=event_properties_keys,
string_event_properties_keys=string_event_properties_keys,
numeric_event_properties_keys=numeric_event_properties_keys,
),
]
15 changes: 12 additions & 3 deletions airbyte-integrations/connectors/source-orb/source_orb/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,23 @@
"description": "When set to N, the connector will always refresh resources created within the past N days. By default, updated objects that are not newly created are not incrementally synced.",
"order": 3
},
"event_properties_keys": {
"string_event_properties_keys": {
"type": "array",
"items": {
"type": "string"
},
"title": "Event properties keys",
"description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction",
"title": "Event properties keys (string values)",
"description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction.",
"order": 4
},
"numeric_event_properties_keys": {
"type": "array",
"items": {
"type": "string"
},
"title": "Event properties keys (numeric values)",
"description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction.",
"order": 5
}
}
}
Expand Down
Loading

0 comments on commit 01ed0e7

Please sign in to comment.