From 769a1062b531e33ce9824393d09b7c5d3f1b57af Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Mon, 11 Jul 2022 21:53:33 -0700 Subject: [PATCH 01/12] Source Okta: add resource-sets (inremental supported) - stop using the deprecated method get_updated_state, use state property and state setter instead - the payload resource-sets is enveloped, _links.next.href contains the cursor --- .../tests/test_incremental.py | 2 +- .../integration_tests/abnormal_state.json | 3 +- .../integration_tests/configured_catalog.json | 11 +++ .../source_okta/schemas/resource_sets.json | 63 +++++++++++++++++ .../source-okta/source_okta/source.py | 68 ++++++++++++++----- .../source-okta/unit_tests/conftest.py | 20 ++++++ .../source-okta/unit_tests/test_streams.py | 55 ++++++++++++--- docs/integrations/sources/okta.md | 2 + 8 files changed, 195 insertions(+), 29 deletions(-) create mode 100644 airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py index 3745f1ce4db1a..8c6dfb38dd46e 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py @@ -210,5 +210,5 @@ def test_state_with_abnormally_large_values(self, connector_config, configured_c records = filter_output(output, type_=Type.RECORD) states = filter_output(output, type_=Type.STATE) - assert not records, "The sync should produce no records when run with the state with abnormally large values" + assert not records, f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}" assert states, "The sync should produce at least one STATE message" diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json index 18615c6654276..99a265b8c8b32 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json @@ -2,5 +2,6 @@ "users": { "lastUpdated": "3021-09-08T07:04:28.000Z" }, "groups": { "lastUpdated": "3021-09-08T07:04:28.000Z" }, "group_members": { "id": "00uzzzzzzzzzzzzzzzzz" }, - "logs": { "published": "3021-09-08T07:04:28.000Z" } + "logs": { "published": "3021-09-08T07:04:28.000Z" }, + "resource_sets": { "id": "iamzzzzzzzzzzzzzzzzz" } } diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json index 14c4218507f87..8783568f91e4d 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json @@ -44,6 +44,17 @@ "cursor_field": ["id"], "primary_key": [["id"]] }, + { + "stream": { + "name": "resource_sets", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite", + "cursor_field": ["id"], + "primary_key": [["id"]] + }, { "stream": { "name": "custom_roles", diff --git a/airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json b/airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json new file mode 100644 index 0000000000000..0741df0fedf93 --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json @@ -0,0 +1,63 @@ +{ + "properties": { + "id": { + "type": "string" + }, + "label": { + "type": "string" + }, + "description": { + "type": "string" + }, + "_links": { + "properties": { + "assignee": { + "properties": { + "self": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "gets this Resource Set" + }, + "resources": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "gets a paginable list of resources included in this set" + }, + "bindings": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "gets a paginable list of admin Role Bindings assigned to this set" + }, + "next": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "the link for the next page, 'after' is the query string, the cursor field is id" + } + } + } + }, + "type": ["object", "null"] + } + }, + "type": "object" +} diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index 5f441f3888bbf..d94f5e70525d5 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -43,7 +43,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, if "self" in links: if links["self"]["url"] == next_url: return None - return query_params return None @@ -77,19 +76,22 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: class IncrementalOktaStream(OktaStream, ABC): + def __init__(self, url_base: str, *args, **kwargs): + super().__init__(url_base, *args, **kwargs) + self._state = {} + @property @abstractmethod def cursor_field(self) -> str: pass - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - lowest_date = str(pendulum.datetime.min) - return { - self.cursor_field: max( - latest_record.get(self.cursor_field, lowest_date), - current_stream_state.get(self.cursor_field, lowest_date), - ) - } + @property + def state(self) -> Mapping[str, Any]: + return self._state + + @state.setter + def state(self, value: Mapping[str, Any]): + self._state[self.cursor_field] = value[self.cursor_field] def request_params( self, @@ -116,7 +118,6 @@ def path(self, **kwargs) -> str: class GroupMembers(IncrementalOktaStream): cursor_field = "id" primary_key = "id" - min_user_id = "00u00000000000000000" use_cache = True def stream_slices(self, **kwargs): @@ -140,14 +141,6 @@ def request_params( params["after"] = latest_entry return params - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - return { - self.cursor_field: max( - latest_record.get(self.cursor_field, self.min_user_id), - current_stream_state.get(self.cursor_field, self.min_user_id), - ) - } - class GroupRoleAssignments(OktaStream): primary_key = "id" @@ -222,6 +215,44 @@ def request_params( return params +class ResourceSets(IncrementalOktaStream): + cursor_field = "id" + primary_key = "id" + + def path(self, **kwargs) -> str: + return "iam/resource-sets" + + def parse_response( + self, + response: requests.Response, + **kwargs, + ) -> Iterable[Mapping]: + yield from response.json()["resource-sets"] + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + # We can't follow the default pagination that takes query from header.links + # Instead, the payload contains _links that offers the next link + body = response.json() + if "_links" in body and "next" in body["_links"] and "href" in body["_links"]["next"]: + next_url = body["_links"]["next"]["href"] + parsed_link = parse.urlparse(next_url) + return dict(parse.parse_qsl(parsed_link.query)) + + return None + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + latest_entry = stream_state.get(self.cursor_field) + if latest_entry: + params["after"] = latest_entry + return params + + class CustomRoles(OktaStream): # https://developer.okta.com/docs/reference/api/roles/#list-roles primary_key = "id" @@ -366,4 +397,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: UserRoleAssignments(**initialization_params), GroupRoleAssignments(**initialization_params), Permissions(**initialization_params), + ResourceSets(**initialization_params), ] diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py b/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py index de7b6e592876c..825b53d2b4237 100644 --- a/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py @@ -377,6 +377,26 @@ def logs_instance(): } +@pytest.fixture() +def resource_set_instance(api_url): + """ + Resource set object instance + """ + _id = "iam5xyzmibarA6Afoo7" + return { + "id": _id, + "label": "all users", + "description": "all users", + "created": "2022-07-09T20:58:41.000Z", + "lastUpdated": "2022-07-09T20:58:41.000Z", + "_links": { + "bindings": {"href": f"{url_base}/iam/resource-sets/{_id}/bindings"}, + "self": {"href": f"{url_base}/iam/resource-sets/{_id}"}, + "resources": {"href": f"{url_base}/iam/resource-sets/{_id}/resources"}, + }, + } + + @pytest.fixture() def latest_record_instance(url_base, api_url): """ diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py index bf744d3146012..4caf524cd3dea 100644 --- a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py @@ -20,6 +20,7 @@ Logs, OktaStream, Permissions, + ResourceSets, UserRoleAssignments, Users, ) @@ -126,11 +127,10 @@ def cursor_field(self) -> str: stream = TestIncrementalOktaStream(url_base=url_base) stream._cursor_field = "lastUpdated" + assert stream.state == {} - current_stream_state = {"lastUpdated": "2021-04-21T21:03:55.000Z"} - update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) - expected_result = {"lastUpdated": "2022-07-18T07:58:11.000Z"} - assert update_state == expected_result + stream.state = {"lastUpdated": "2021-04-21T21:03:55.000Z"} + assert stream.state == {"lastUpdated": "2021-04-21T21:03:55.000Z"} def test_okta_stream_http_method(self, patch_base_class, url_base): stream = OktaStream(url_base=url_base) @@ -272,12 +272,12 @@ def test_group_members_slice_stream(self, requests_mock, patch_base_class, group def test_group_member_request_get_update_state(self, latest_record_instance, url_base): stream = GroupMembers(url_base=url_base) stream._cursor_field = "id" - current_stream_state = {"id": "test_user_group_id"} - update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) - assert update_state == {"id": "test_user_group_id"} + assert stream.state == {} + stream.state = {"id": "test_user_group_id"} + assert stream.state == {"id": "test_user_group_id"} -class TestStreamGroupRoleAssignment: +class TestStreamGroupRoleAssignments: def test_group_role_assignments(self, requests_mock, patch_base_class, group_role_assignments_instance, url_base, api_url): stream = GroupRoleAssignments(url_base=url_base) group_id = "test_group_id" @@ -325,7 +325,7 @@ def test_logs_request_params_for_until(self, patch_base_class, logs_instance, ur assert stream.request_params(**inputs) == {"limit": 200, "since": testing_date.isoformat(), "until": testing_date.isoformat()} -class TestStreamUserRoleAssignment: +class TestStreamUserRoleAssignments: def test_user_role_assignments(self, requests_mock, patch_base_class, user_role_assignments_instance, url_base, api_url): stream = UserRoleAssignments(url_base=url_base) user_id = "test_user_id" @@ -345,3 +345,40 @@ def test_user_role_assignments_slice_stream( stream = UserRoleAssignments(url_base=url_base) requests_mock.get(f"{api_url}/users?limit=200", json=[users_instance]) assert list(stream.stream_slices()) == [{"user_id": "test_user_id"}] + + +class TestStreamResourceSets: + def test_resource_sets(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): + stream = ResourceSets(url_base=url_base) + record = {"resource-sets": [resource_set_instance]} + requests_mock.get(f"{api_url}/iam/resource-sets", json=record) + inputs = {"sync_mode": SyncMode.incremental} + assert list(stream.read_records(**inputs)) == record["resource-sets"] + + def test_resource_sets_parse_response(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): + stream = ResourceSets(url_base=url_base) + record = {"resource-sets": [resource_set_instance]} + requests_mock.get(f"{api_url}", json=record) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [resource_set_instance] + + def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): + stream = ResourceSets(url_base=url_base) + cursor = "iam5cursorFybecursor" + response = MagicMock(requests.Response) + next_link = f"{url_base}/iam/resource-sets?after={cursor}" + response.json = MagicMock(return_value={"_links": {"next": {"href": next_link}}, "resource-sets": [resource_set_instance]}) + inputs = {"response": response} + result = stream.next_page_token(**inputs) + assert result == {"after": cursor} + + response.json = MagicMock(return_value={"resource-sets": [resource_set_instance]}) + inputs = {"response": response} + result = stream.next_page_token(**inputs) + assert result == None + + def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): + stream = ResourceSets(url_base=url_base) + cursor = "iam5cursorFybecursor" + inputs = {"stream_slice": None, "stream_state": {"id": cursor}, "next_page_token": None} + expected_params = {"limit": 200, "after": "iam5cursorFybecursor", "filter": 'id gt "iam5cursorFybecursor"'} + assert stream.request_params(**inputs) == expected_params diff --git a/docs/integrations/sources/okta.md b/docs/integrations/sources/okta.md index 4149720f3c1ac..848c5ef42c213 100644 --- a/docs/integrations/sources/okta.md +++ b/docs/integrations/sources/okta.md @@ -16,6 +16,7 @@ This Source is capable of syncing the following core Streams: - [System Log](https://developer.okta.com/docs/reference/api/system-log/#get-started) - [Custom Roles](https://developer.okta.com/docs/reference/api/roles/#list-roles) - [Permissions](https://developer.okta.com/docs/reference/api/roles/#list-permissions) +- [Resource Sets](https://developer.okta.com/docs/reference/api/roles/#list-resource-sets) ### Data type mapping @@ -62,6 +63,7 @@ Different Okta APIs require different admin privilege levels. API tokens inherit | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------| +| 0.1.12 | 2022-08-03 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | add resource sets | | 0.1.11 | 2022-08-03 | [14739](https://github.com/airbytehq/airbyte/pull/14739) | add permissions for custom roles | | 0.1.10 | 2022-08-01 | [15179](https://github.com/airbytehq/airbyte/pull/15179) | Fixed broken schemas for all streams | 0.1.9 | 2022-07-25 | [15001](https://github.com/airbytehq/airbyte/pull/15001) | Return deprovisioned users | From f636cb8a882a3801931db421d529a0316b651d2f Mon Sep 17 00:00:00 2001 From: sajarin Date: Tue, 9 Aug 2022 13:20:26 -0400 Subject: [PATCH 02/12] fix: change assert statement with flake formatting --- .../connectors/source-okta/unit_tests/test_streams.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py index fc8791d47f3b0..cd791984a39e0 100644 --- a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py @@ -287,7 +287,6 @@ def test_group_member_request_get_update_state(self, latest_record_instance, url assert stream.state == {"id": "test_user_group_id"} - class TestStreamGroupRoleAssignment: def test_group_role_assignments(self, requests_mock, patch_base_class, group_role_assignments_instance, url_base, api_url, start_date): stream = GroupRoleAssignments(url_base=url_base, start_date=start_date) @@ -390,7 +389,7 @@ def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, re response.json = MagicMock(return_value={"resource-sets": [resource_set_instance]}) inputs = {"response": response} result = stream.next_page_token(**inputs) - assert result == None + assert result is None def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): stream = ResourceSets(url_base=url_base) From 300809222864a25b70a8852e3f2ec4e1b364eae0 Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Tue, 9 Aug 2022 15:32:49 -0700 Subject: [PATCH 03/12] fix unit tests --- .../source-okta/source_okta/source.py | 42 ++++++++++++------- .../source-okta/unit_tests/test_streams.py | 30 ++++++------- docs/integrations/sources/okta.md | 2 +- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index 50c0c7561af77..5815b4e48871e 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -78,22 +78,21 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: class IncrementalOktaStream(OktaStream, ABC): - def __init__(self, url_base: str, *args, **kwargs): - super().__init__(url_base, *args, **kwargs) - self._state = {} + min_id = '' @property @abstractmethod def cursor_field(self) -> str: pass - @property - def state(self) -> Mapping[str, Any]: - return self._state - - @state.setter - def state(self, value: Mapping[str, Any]): - self._state[self.cursor_field] = value[self.cursor_field] + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + lowest_date = self.min_id if self.min_id else str(pendulum.datetime.min) + return { + self.cursor_field: max( + latest_record.get(self.cursor_field, lowest_date), + current_stream_state.get(self.cursor_field, lowest_date), + ) + } def request_params( self, @@ -120,12 +119,21 @@ class GroupMembers(IncrementalOktaStream): cursor_field = "id" primary_key = "id" use_cache = True + min_id = "00u00000000000000000" def stream_slices(self, **kwargs): group_stream = Groups(authenticator=self.authenticator, url_base=self.url_base, start_date=self.start_date) for group in group_stream.read_records(sync_mode=SyncMode.full_refresh): yield {"group_id": group["id"]} + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + return { + self.cursor_field: max( + latest_record.get(self.cursor_field, self.min_id), + current_stream_state.get(self.cursor_field, self.min_id), + ) + } + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: group_id = stream_slice["group_id"] return f"groups/{group_id}/users" @@ -136,11 +144,8 @@ def request_params( stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: - # Filter param should be ignored SCIM filter expressions can't use the published - # attribute since it may conflict with the logic of the since, after, and until query params. - # Docs: https://developer.okta.com/docs/reference/api/system-log/#expression-filter params = super(IncrementalOktaStream, self).request_params(stream_state, stream_slice, next_page_token) - latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_user_id + latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_id params["after"] = latest_entry return params @@ -246,10 +251,19 @@ def request_params( class ResourceSets(IncrementalOktaStream): cursor_field = "id" primary_key = "id" + min_id = 'iam00000000000000000' def path(self, **kwargs) -> str: return "iam/resource-sets" + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + return { + self.cursor_field: max( + latest_record.get(self.cursor_field, self.min_id), + current_stream_state.get(self.cursor_field, self.min_id), + ) + } + def parse_response( self, response: requests.Response, diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py index cd791984a39e0..ed5b2bb6948a0 100644 --- a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py @@ -127,10 +127,10 @@ def cursor_field(self) -> str: stream = TestIncrementalOktaStream(url_base=url_base, start_date=start_date) stream._cursor_field = "lastUpdated" - assert stream.state == {} - - stream.state = {"lastUpdated": "2021-04-21T21:03:55.000Z"} - assert stream.state == {"lastUpdated": "2021-04-21T21:03:55.000Z"} + current_stream_state = {"lastUpdated": "2021-04-21T21:03:55.000Z"} + update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) + expected_result = {"lastUpdated": "2022-07-18T07:58:11.000Z"} + assert update_state == expected_result def test_okta_stream_http_method(self, patch_base_class, url_base, start_date): stream = OktaStream(url_base=url_base, start_date=start_date) @@ -282,9 +282,9 @@ def test_group_members_slice_stream( def test_group_member_request_get_update_state(self, latest_record_instance, url_base, start_date): stream = GroupMembers(url_base=url_base, start_date=start_date) stream._cursor_field = "id" - assert stream.state == {} - stream.state = {"id": "test_user_group_id"} - assert stream.state == {"id": "test_user_group_id"} + current_stream_state = {"id": "test_user_group_id"} + update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) + assert update_state == {"id": "test_user_group_id"} class TestStreamGroupRoleAssignment: @@ -363,21 +363,21 @@ def test_user_role_assignments_slice_stream( class TestStreamResourceSets: - def test_resource_sets(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): - stream = ResourceSets(url_base=url_base) + def test_resource_sets(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) record = {"resource-sets": [resource_set_instance]} requests_mock.get(f"{api_url}/iam/resource-sets", json=record) inputs = {"sync_mode": SyncMode.incremental} assert list(stream.read_records(**inputs)) == record["resource-sets"] - def test_resource_sets_parse_response(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): - stream = ResourceSets(url_base=url_base) + def test_resource_sets_parse_response(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) record = {"resource-sets": [resource_set_instance]} requests_mock.get(f"{api_url}", json=record) assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [resource_set_instance] - def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): - stream = ResourceSets(url_base=url_base) + def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) cursor = "iam5cursorFybecursor" response = MagicMock(requests.Response) next_link = f"{url_base}/iam/resource-sets?after={cursor}" @@ -391,8 +391,8 @@ def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, re result = stream.next_page_token(**inputs) assert result is None - def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url): - stream = ResourceSets(url_base=url_base) + def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) cursor = "iam5cursorFybecursor" inputs = {"stream_slice": None, "stream_state": {"id": cursor}, "next_page_token": None} expected_params = {"limit": 200, "after": "iam5cursorFybecursor", "filter": 'id gt "iam5cursorFybecursor"'} diff --git a/docs/integrations/sources/okta.md b/docs/integrations/sources/okta.md index 44e0799c144af..6bbb34ba37b01 100644 --- a/docs/integrations/sources/okta.md +++ b/docs/integrations/sources/okta.md @@ -79,7 +79,7 @@ The connector is restricted by normal Okta [requests limitation](https://develop | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------| -| 0.1.13 | 2022-08-09 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | add resource sets | +| 0.1.13 | 2022-08-09 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | Add resource sets | | 0.1.12 | 2022-08-05 | [15050](https://github.com/airbytehq/airbyte/pull/15050) | Add parameter `start_date` for Logs stream | | 0.1.11 | 2022-08-03 | [14739](https://github.com/airbytehq/airbyte/pull/14739) | Add permissions for custom roles | | 0.1.10 | 2022-08-01 | [15179](https://github.com/airbytehq/airbyte/pull/15179) | Fix broken schemas for all streams | From 3349ad9a64dc001ab0e61ce127ba10cd69840f95 Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Tue, 9 Aug 2022 20:22:22 -0700 Subject: [PATCH 04/12] clean up get_updated_state overriding --- .../source-okta/source_okta/source.py | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index 5815b4e48871e..7499e688b7a10 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -86,11 +86,11 @@ def cursor_field(self) -> str: pass def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - lowest_date = self.min_id if self.min_id else str(pendulum.datetime.min) + lowest_cursor_value = self.min_id if self.min_id else str(pendulum.datetime.min) return { self.cursor_field: max( - latest_record.get(self.cursor_field, lowest_date), - current_stream_state.get(self.cursor_field, lowest_date), + latest_record.get(self.cursor_field, lowest_cursor_value), + current_stream_state.get(self.cursor_field, lowest_cursor_value), ) } @@ -126,14 +126,6 @@ def stream_slices(self, **kwargs): for group in group_stream.read_records(sync_mode=SyncMode.full_refresh): yield {"group_id": group["id"]} - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - return { - self.cursor_field: max( - latest_record.get(self.cursor_field, self.min_id), - current_stream_state.get(self.cursor_field, self.min_id), - ) - } - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: group_id = stream_slice["group_id"] return f"groups/{group_id}/users" @@ -256,14 +248,6 @@ class ResourceSets(IncrementalOktaStream): def path(self, **kwargs) -> str: return "iam/resource-sets" - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - return { - self.cursor_field: max( - latest_record.get(self.cursor_field, self.min_id), - current_stream_state.get(self.cursor_field, self.min_id), - ) - } - def parse_response( self, response: requests.Response, From 28535212a56930e0cc93e262de7d31b05a1fde8c Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Tue, 9 Aug 2022 20:47:40 -0700 Subject: [PATCH 05/12] rename and correct sample config --- .../connectors/source-okta/sample_files/config.json | 8 ++++++-- .../connectors/source-okta/source_okta/source.py | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/sample_files/config.json b/airbyte-integrations/connectors/source-okta/sample_files/config.json index 454bc28225e8a..4d0b4d1270ce9 100644 --- a/airbyte-integrations/connectors/source-okta/sample_files/config.json +++ b/airbyte-integrations/connectors/source-okta/sample_files/config.json @@ -1,4 +1,8 @@ { - "base_url": "https://myorg.okta.com", - "token": "xyz123foo325a.fbar" + "domain": "https://myorg.okta.com", + "start_date": "2022-07-22T00:00:00Z", + "credentials": { + "auth_type": "api_token", + "api_token": "00utdD4y7kFooBarHEqDJS2WVGciaiwVlaEoxoRw_2" + } } diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index 7499e688b7a10..1b6714148956a 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -86,11 +86,11 @@ def cursor_field(self) -> str: pass def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - lowest_cursor_value = self.min_id if self.min_id else str(pendulum.datetime.min) + min_cursor_value = self.min_id if self.min_id else str(pendulum.datetime.min) return { self.cursor_field: max( - latest_record.get(self.cursor_field, lowest_cursor_value), - current_stream_state.get(self.cursor_field, lowest_cursor_value), + latest_record.get(self.cursor_field, min_cursor_value), + current_stream_state.get(self.cursor_field, min_cursor_value), ) } From bd502d9feeaf19843691f4eb35cae052acc3f5bf Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Tue, 9 Aug 2022 21:17:07 -0700 Subject: [PATCH 06/12] correct sample valid and invalid config --- .../source-okta/integration_tests/invalid_config.json | 5 +++-- .../connectors/source-okta/sample_files/config.json | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json index 6322da62450f4..51d818d228e9b 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json @@ -1,4 +1,5 @@ { - "base_url": "invalid url", - "token": "invalid token" + "domain": "myorg", + "start_date": "2022-07-22T00:00:00Z", + "credentials": {} } diff --git a/airbyte-integrations/connectors/source-okta/sample_files/config.json b/airbyte-integrations/connectors/source-okta/sample_files/config.json index 4d0b4d1270ce9..71bcd71df633d 100644 --- a/airbyte-integrations/connectors/source-okta/sample_files/config.json +++ b/airbyte-integrations/connectors/source-okta/sample_files/config.json @@ -1,5 +1,5 @@ { - "domain": "https://myorg.okta.com", + "domain": "myorg", "start_date": "2022-07-22T00:00:00Z", "credentials": { "auth_type": "api_token", From c4c9f8d0dca5e608c0cb07b3ad03a087673491f3 Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Tue, 9 Aug 2022 21:30:19 -0700 Subject: [PATCH 07/12] fake the token more --- .../connectors/source-okta/sample_files/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-okta/sample_files/config.json b/airbyte-integrations/connectors/source-okta/sample_files/config.json index 71bcd71df633d..2602e8825d853 100644 --- a/airbyte-integrations/connectors/source-okta/sample_files/config.json +++ b/airbyte-integrations/connectors/source-okta/sample_files/config.json @@ -3,6 +3,6 @@ "start_date": "2022-07-22T00:00:00Z", "credentials": { "auth_type": "api_token", - "api_token": "00utdD4y7kFooBarHEqDJS2WVGciaiwVlaEoxoRw_2" + "api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2" } } From 98b6274b4a5ebe3b8bbdb01d34a1fca054bc40ab Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Wed, 10 Aug 2022 11:01:24 -0700 Subject: [PATCH 08/12] fix the invalid_config.json --- .../source-okta/integration_tests/invalid_config.json | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json index 51d818d228e9b..2602e8825d853 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json @@ -1,5 +1,8 @@ { "domain": "myorg", "start_date": "2022-07-22T00:00:00Z", - "credentials": {} + "credentials": { + "auth_type": "api_token", + "api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2" + } } From bcb6c8d11bd3bd8895c39a5cedf3ec8ac6e0543f Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Wed, 10 Aug 2022 11:02:29 -0700 Subject: [PATCH 09/12] change the order of stream, hopefully logs is run firstly --- .../integration_tests/configured_catalog.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json index 8783568f91e4d..8e667b9e4c8dd 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json @@ -2,18 +2,18 @@ "streams": [ { "stream": { - "name": "users", + "name": "logs", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite", - "cursor_field": ["lastUpdated"], - "primary_key": [["id"]] + "cursor_field": ["published"], + "primary_key": [["uuid"]] }, { "stream": { - "name": "groups", + "name": "users", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, @@ -24,14 +24,14 @@ }, { "stream": { - "name": "logs", + "name": "groups", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite", - "cursor_field": ["published"], - "primary_key": [["uuid"]] + "cursor_field": ["lastUpdated"], + "primary_key": [["id"]] }, { "stream": { From 13cf1da31c5a91c7e79d94cb403206f1e5a57bec Mon Sep 17 00:00:00 2001 From: sajarin Date: Thu, 11 Aug 2022 15:56:05 -0400 Subject: [PATCH 10/12] fix: remove log stream from configured catalog --- .../integration_tests/configured_catalog.json | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json index 8e667b9e4c8dd..286cb39956b41 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json @@ -1,16 +1,5 @@ { "streams": [ - { - "stream": { - "name": "logs", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "overwrite", - "cursor_field": ["published"], - "primary_key": [["uuid"]] - }, { "stream": { "name": "users", From a88c4cb8462f67af13ba61d1a58f52732fb1ef96 Mon Sep 17 00:00:00 2001 From: sajarin Date: Fri, 12 Aug 2022 12:25:36 -0400 Subject: [PATCH 11/12] fix: bump connector version on okta.md and Dockerfile --- airbyte-integrations/connectors/source-okta/Dockerfile | 2 +- docs/integrations/sources/okta.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-okta/Dockerfile b/airbyte-integrations/connectors/source-okta/Dockerfile index 6d93118fa396b..d1fb62853e03c 100644 --- a/airbyte-integrations/connectors/source-okta/Dockerfile +++ b/airbyte-integrations/connectors/source-okta/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.12 +LABEL io.airbyte.version=0.1.13 LABEL io.airbyte.name=airbyte/source-okta diff --git a/docs/integrations/sources/okta.md b/docs/integrations/sources/okta.md index 6bbb34ba37b01..4edeb60892b39 100644 --- a/docs/integrations/sources/okta.md +++ b/docs/integrations/sources/okta.md @@ -79,7 +79,7 @@ The connector is restricted by normal Okta [requests limitation](https://develop | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------| -| 0.1.13 | 2022-08-09 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | Add resource sets | +| 0.1.13 | 2022-08-12 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | Add resource sets | | 0.1.12 | 2022-08-05 | [15050](https://github.com/airbytehq/airbyte/pull/15050) | Add parameter `start_date` for Logs stream | | 0.1.11 | 2022-08-03 | [14739](https://github.com/airbytehq/airbyte/pull/14739) | Add permissions for custom roles | | 0.1.10 | 2022-08-01 | [15179](https://github.com/airbytehq/airbyte/pull/15179) | Fix broken schemas for all streams | From 4fb92cff368e98929359404b4985d1e7688e9f6d Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Fri, 12 Aug 2022 16:41:54 +0000 Subject: [PATCH 12/12] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../source_acceptance_test/tests/test_incremental.py | 4 +++- .../connectors/source-okta/source_okta/source.py | 4 ++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f474624269c57..184ce8aa99275 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -628,7 +628,7 @@ - name: Okta sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372 dockerRepository: airbyte/source-okta - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.13 documentationUrl: https://docs.airbyte.io/integrations/sources/okta icon: okta.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index efa51da1adb78..808f5bab1a790 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6025,7 +6025,7 @@ - - "client_secret" oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-okta:0.1.12" +- dockerImage: "airbyte/source-okta:0.1.13" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/okta" connectionSpecification: diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py index 8c6dfb38dd46e..209c979c8a24c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py @@ -210,5 +210,7 @@ def test_state_with_abnormally_large_values(self, connector_config, configured_c records = filter_output(output, type_=Type.RECORD) states = filter_output(output, type_=Type.STATE) - assert not records, f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}" + assert ( + not records + ), f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}" assert states, "The sync should produce at least one STATE message" diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index 1b6714148956a..380e55d10a3a8 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -78,7 +78,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: class IncrementalOktaStream(OktaStream, ABC): - min_id = '' + min_id = "" @property @abstractmethod @@ -243,7 +243,7 @@ def request_params( class ResourceSets(IncrementalOktaStream): cursor_field = "id" primary_key = "id" - min_id = 'iam00000000000000000' + min_id = "iam00000000000000000" def path(self, **kwargs) -> str: return "iam/resource-sets"