Skip to content

Commit

Permalink
🎉 Source Orb: enrich credits ledger entries with cost basis data, des…
Browse files Browse the repository at this point in the history
…cription, and update expiration date fields (#11528)

* Add cost basis data to connector and remap block_expiry_date field

* Update dockerfile and orb.md

* Update orb.md and comments

* Add entry_status=committed as filter for getting CreditLedgerEntries

* Update version information

* Move entry_status filter to the correct endpoint

* PR feedback: rename field and add docstring for committed entries

* Fix unit tests to include entry_status param

* Add a unit test to validate transform behavior

* Format using gradlew format

* Bump connector veresion in spec and definitions
  • Loading branch information
anushree-agrawal authored Apr 26, 2022
1 parent 2a8df33 commit 03a6b6b
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@
- name: Orb
sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc
dockerRepository: airbyte/source-orb
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/orb
icon: orb.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5971,7 +5971,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-orb:0.1.1"
- dockerImage: "airbyte/source-orb:0.1.2"
spec:
documentationUrl: "https://docs.withorb.com/"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-orb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_orb ./source_orb
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-orb
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@
"starting_balance": { "type": "number" },
"ending_balance": { "type": "number" },
"amount": { "type": ["null", "number"] },
"block_expiry_date": {
"type": ["null", "string"],
"format": "date-time"
},
"created_at": { "type": ["null", "string"], "format": "date-time" },
"entry_type": { "type": "string" },
"expiry_date": {
Expand All @@ -131,6 +127,14 @@
"id": { "type": "string" },
"external_customer_id": { "type": ["null", "string"] }
}
},
"credit_block": {
"type": "object",
"properties": {
"id": { "type": "string" },
"expiry_date": { "type": ["null", "string"] },
"per_unit_cost_basis": { "type": ["null", "string"] }
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
},
"customer_id": {
"type": "string"
},
"credit_block_per_unit_cost_basis": {
"type": ["null", "string"]
},
"description": {
"type": ["null", "string"]
}
},
"required": [
Expand All @@ -40,6 +46,8 @@
"amount",
"created_at",
"customer_id",
"entry_type"
"entry_type",
"credit_block_per_unit_cost_basis",
"description"
]
}
16 changes: 15 additions & 1 deletion airbyte-integrations/connectors/source-orb/source_orb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,18 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[
Request params are based on the specific slice (i.e. customer_id) we are requesting for,
and so we need to pull out relevant slice state from the stream state.
Ledger entries can either be `pending` or `committed`.
We're filtering to only return `committed` ledger entries, which are entries that are older than the
reporting grace period (12 hours) and are considered finalized.
`pending` entries can change during the reporting grace period, so we don't want to export those entries.
Note that the user of super() here implies that the state for a specific slice of this stream
is of the same format as the stream_state of a regular incremental stream.
"""
current_customer_state = stream_state.get(stream_slice["customer_id"], {})
return super().request_params(current_customer_state, **kwargs)
params = super().request_params(current_customer_state, **kwargs)
params["entry_status"] = "committed"
return params

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
"""
Expand Down Expand Up @@ -268,6 +275,13 @@ def transform_record(self, ledger_entry_record):
del ledger_entry_record["customer"]
ledger_entry_record["customer_id"] = nested_customer_id

# Un-nest credit_block -> expiry_date into block_expiry_date and per_unit_cost_basis
nested_expiry_date = ledger_entry_record["credit_block"]["expiry_date"]
nested_per_unit_cost_basis = ledger_entry_record["credit_block"]["per_unit_cost_basis"]
del ledger_entry_record["credit_block"]
ledger_entry_record["block_expiry_date"] = nested_expiry_date
ledger_entry_record["credit_block_per_unit_cost_basis"] = nested_per_unit_cost_basis

return ledger_entry_record

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_credits_ledger_entries_stream_slices(mocker):
def test_credits_ledger_entries_request_params(mocker, current_stream_state, current_stream_slice, next_page_token):
stream = CreditsLedgerEntries()
inputs = {"stream_state": current_stream_state, "stream_slice": current_stream_slice, "next_page_token": next_page_token}
expected_params = dict(limit=CreditsLedgerEntries.page_size)
expected_params = dict(limit=CreditsLedgerEntries.page_size, entry_status="committed")
current_slice_state = current_stream_state.get(current_stream_slice["customer_id"], {})
if current_slice_state.get("created_at"):
expected_params["created_at[gte]"] = current_slice_state["created_at"]
Expand All @@ -170,6 +170,27 @@ def test_credits_ledger_entries_request_params(mocker, current_stream_state, cur
assert stream.request_params(**inputs) == expected_params


def test_credits_ledger_entries_transform_record(mocker):
stream = CreditsLedgerEntries()
ledger_entry_record = {
"event_id": "foo-event-id",
"entry_type": "decrement",
"customer": {
"id": "foo-customer-id",
},
"credit_block": {"expiry_date": "2023-01-25T12:00:00+00:00", "per_unit_cost_basis": "2.50"},
}

# Validate that calling transform record unwraps nested customer and credit block fields.
assert stream.transform_record(ledger_entry_record) == {
"event_id": "foo-event-id",
"entry_type": "decrement",
"customer_id": "foo-customer-id",
"block_expiry_date": "2023-01-25T12:00:00+00:00",
"credit_block_per_unit_cost_basis": "2.50",
}


@responses.activate
def test_credits_ledger_entries_no_matching_events(mocker):
stream = CreditsLedgerEntries(string_event_properties_keys=["ping"])
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/sources/orb.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ an Orb Account and API Key.
## Changelog

| Version | Date | Pull Request | Subject |
| --- | --- | --- | --- |
| 0.1.2 | 2022-04-20 | [11528](https://github.com/airbytehq/airbyte/pull/11528) | Add cost basis to ledger entries, update expiration date, sync only committed entries
| 0.1.1 | 2022-03-03 | [10839](https://github.com/airbytehq/airbyte/pull/10839) | Support ledger entries with numeric properties + schema fixes
| 0.1.0 | 2022-02-01 | | New Source: Orb
| :--- | :--- | :--- | :--- |
Expand Down

0 comments on commit 03a6b6b

Please sign in to comment.