Skip to content

Commit

Permalink
Source Okta: add resource-sets (incremental supported) (#14700)
Browse files Browse the repository at this point in the history
* Source Okta: add resource-sets (inremental supported)

- stop using the deprecated method get_updated_state, use state property and state setter instead
- the payload resource-sets is enveloped, _links.next.href contains the cursor

* fix: change assert statement with flake formatting

* fix unit tests

* clean up get_updated_state overriding

* rename and correct sample config

* correct sample valid and invalid config

* fake the token more

* fix the invalid_config.json

* change the order of stream, hopefully logs is run firstly

* fix: remove log stream from configured catalog

* fix: bump connector version on okta.md and Dockerfile

* auto-bump connector version [ci skip]

Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 12, 2022
1 parent 6d9ae93 commit d82632f
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@
- name: Okta
sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372
dockerRepository: airbyte/source-okta
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
documentationUrl: https://docs.airbyte.io/integrations/sources/okta
icon: okta.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6197,7 +6197,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-okta:0.1.12"
- dockerImage: "airbyte/source-okta:0.1.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/okta"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,7 @@ def test_state_with_abnormally_large_values(self, connector_config, configured_c
records = filter_output(output, type_=Type.RECORD)
states = filter_output(output, type_=Type.STATE)

assert not records, "The sync should produce no records when run with the state with abnormally large values"
assert (
not records
), f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}"
assert states, "The sync should produce at least one STATE message"
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-okta/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/source-okta
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"users": { "lastUpdated": "3021-09-08T07:04:28.000Z" },
"groups": { "lastUpdated": "3021-09-08T07:04:28.000Z" },
"group_members": { "id": "00uzzzzzzzzzzzzzzzzz" },
"logs": { "published": "3021-09-08T07:04:28.000Z" }
"logs": { "published": "3021-09-08T07:04:28.000Z" },
"resource_sets": { "id": "iamzzzzzzzzzzzzzzzzz" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
},
{
"stream": {
"name": "logs",
"name": "group_members",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["published"],
"primary_key": [["uuid"]]
"cursor_field": ["id"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "group_members",
"name": "resource_sets",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"base_url": "invalid url",
"token": "invalid token"
"domain": "myorg",
"start_date": "2022-07-22T00:00:00Z",
"credentials": {
"auth_type": "api_token",
"api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2"
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"base_url": "https://myorg.okta.com",
"token": "xyz123foo325a.fbar"
"domain": "myorg",
"start_date": "2022-07-22T00:00:00Z",
"credentials": {
"auth_type": "api_token",
"api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"properties": {
"id": {
"type": "string"
},
"label": {
"type": "string"
},
"description": {
"type": "string"
},
"_links": {
"properties": {
"assignee": {
"properties": {
"self": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "gets this Resource Set"
},
"resources": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "gets a paginable list of resources included in this set"
},
"bindings": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "gets a paginable list of admin Role Bindings assigned to this set"
},
"next": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "the link for the next page, 'after' is the query string, the cursor field is id"
}
}
}
},
"type": ["object", "null"]
}
},
"type": "object"
}
64 changes: 47 additions & 17 deletions airbyte-integrations/connectors/source-okta/source_okta/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
if "self" in links:
if links["self"]["url"] == next_url:
return None

return query_params

return None
Expand Down Expand Up @@ -79,17 +78,19 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:


class IncrementalOktaStream(OktaStream, ABC):
min_id = ""

@property
@abstractmethod
def cursor_field(self) -> str:
pass

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
lowest_date = str(pendulum.datetime.min)
min_cursor_value = self.min_id if self.min_id else str(pendulum.datetime.min)
return {
self.cursor_field: max(
latest_record.get(self.cursor_field, lowest_date),
current_stream_state.get(self.cursor_field, lowest_date),
latest_record.get(self.cursor_field, min_cursor_value),
current_stream_state.get(self.cursor_field, min_cursor_value),
)
}

Expand Down Expand Up @@ -117,8 +118,8 @@ def path(self, **kwargs) -> str:
class GroupMembers(IncrementalOktaStream):
cursor_field = "id"
primary_key = "id"
min_user_id = "00u00000000000000000"
use_cache = True
min_id = "00u00000000000000000"

def stream_slices(self, **kwargs):
group_stream = Groups(authenticator=self.authenticator, url_base=self.url_base, start_date=self.start_date)
Expand All @@ -135,22 +136,11 @@ def request_params(
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# Filter param should be ignored SCIM filter expressions can't use the published
# attribute since it may conflict with the logic of the since, after, and until query params.
# Docs: https://developer.okta.com/docs/reference/api/system-log/#expression-filter
params = super(IncrementalOktaStream, self).request_params(stream_state, stream_slice, next_page_token)
latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_user_id
latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_id
params["after"] = latest_entry
return params

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.min_user_id),
current_stream_state.get(self.cursor_field, self.min_user_id),
)
}


class GroupRoleAssignments(OktaStream):
primary_key = "id"
Expand Down Expand Up @@ -250,6 +240,45 @@ def request_params(
return params


class ResourceSets(IncrementalOktaStream):
cursor_field = "id"
primary_key = "id"
min_id = "iam00000000000000000"

def path(self, **kwargs) -> str:
return "iam/resource-sets"

def parse_response(
self,
response: requests.Response,
**kwargs,
) -> Iterable[Mapping]:
yield from response.json()["resource-sets"]

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# We can't follow the default pagination that takes query from header.links
# Instead, the payload contains _links that offers the next link
body = response.json()
if "_links" in body and "next" in body["_links"] and "href" in body["_links"]["next"]:
next_url = body["_links"]["next"]["href"]
parsed_link = parse.urlparse(next_url)
return dict(parse.parse_qsl(parsed_link.query))

return None

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
latest_entry = stream_state.get(self.cursor_field)
if latest_entry:
params["after"] = latest_entry
return params


class CustomRoles(OktaStream):
# https://developer.okta.com/docs/reference/api/roles/#list-roles
primary_key = "id"
Expand Down Expand Up @@ -337,4 +366,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
UserRoleAssignments(**initialization_params),
GroupRoleAssignments(**initialization_params),
Permissions(**initialization_params),
ResourceSets(**initialization_params),
]
20 changes: 20 additions & 0 deletions airbyte-integrations/connectors/source-okta/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,26 @@ def logs_instance():
}


@pytest.fixture()
def resource_set_instance(api_url):
"""
Resource set object instance
"""
_id = "iam5xyzmibarA6Afoo7"
return {
"id": _id,
"label": "all users",
"description": "all users",
"created": "2022-07-09T20:58:41.000Z",
"lastUpdated": "2022-07-09T20:58:41.000Z",
"_links": {
"bindings": {"href": f"{url_base}/iam/resource-sets/{_id}/bindings"},
"self": {"href": f"{url_base}/iam/resource-sets/{_id}"},
"resources": {"href": f"{url_base}/iam/resource-sets/{_id}/resources"},
},
}


@pytest.fixture()
def latest_record_instance(url_base, api_url):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Logs,
OktaStream,
Permissions,
ResourceSets,
UserRoleAssignments,
Users,
)
Expand Down Expand Up @@ -126,7 +127,6 @@ def cursor_field(self) -> str:

stream = TestIncrementalOktaStream(url_base=url_base, start_date=start_date)
stream._cursor_field = "lastUpdated"

current_stream_state = {"lastUpdated": "2021-04-21T21:03:55.000Z"}
update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance)
expected_result = {"lastUpdated": "2022-07-18T07:58:11.000Z"}
Expand Down Expand Up @@ -360,3 +360,40 @@ def test_user_role_assignments_slice_stream(
stream = UserRoleAssignments(url_base=url_base, start_date=start_date)
requests_mock.get(f"{api_url}/users?limit=200", json=[users_instance])
assert list(stream.stream_slices()) == [{"user_id": "test_user_id"}]


class TestStreamResourceSets:
def test_resource_sets(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
record = {"resource-sets": [resource_set_instance]}
requests_mock.get(f"{api_url}/iam/resource-sets", json=record)
inputs = {"sync_mode": SyncMode.incremental}
assert list(stream.read_records(**inputs)) == record["resource-sets"]

def test_resource_sets_parse_response(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
record = {"resource-sets": [resource_set_instance]}
requests_mock.get(f"{api_url}", json=record)
assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [resource_set_instance]

def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
cursor = "iam5cursorFybecursor"
response = MagicMock(requests.Response)
next_link = f"{url_base}/iam/resource-sets?after={cursor}"
response.json = MagicMock(return_value={"_links": {"next": {"href": next_link}}, "resource-sets": [resource_set_instance]})
inputs = {"response": response}
result = stream.next_page_token(**inputs)
assert result == {"after": cursor}

response.json = MagicMock(return_value={"resource-sets": [resource_set_instance]})
inputs = {"response": response}
result = stream.next_page_token(**inputs)
assert result is None

def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
cursor = "iam5cursorFybecursor"
inputs = {"stream_slice": None, "stream_state": {"id": cursor}, "next_page_token": None}
expected_params = {"limit": 200, "after": "iam5cursorFybecursor", "filter": 'id gt "iam5cursorFybecursor"'}
assert stream.request_params(**inputs) == expected_params
2 changes: 2 additions & 0 deletions docs/integrations/sources/okta.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The Okta source connector supports the following [sync modes](https://docs.airby
- [System Log](https://developer.okta.com/docs/reference/api/system-log/#get-started)
- [Custom Roles](https://developer.okta.com/docs/reference/api/roles/#list-roles)
- [Permissions](https://developer.okta.com/docs/reference/api/roles/#list-permissions)
- [Resource Sets](https://developer.okta.com/docs/reference/api/roles/#list-resource-sets)

## Performance considerations

Expand All @@ -78,6 +79,7 @@ The connector is restricted by normal Okta [requests limitation](https://develop

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------|
| 0.1.13 | 2022-08-12 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | Add resource sets |
| 0.1.12 | 2022-08-05 | [15050](https://github.com/airbytehq/airbyte/pull/15050) | Add parameter `start_date` for Logs stream |
| 0.1.11 | 2022-08-03 | [14739](https://github.com/airbytehq/airbyte/pull/14739) | Add permissions for custom roles |
| 0.1.10 | 2022-08-01 | [15179](https://github.com/airbytehq/airbyte/pull/15179) | Fix broken schemas for all streams |
Expand Down

0 comments on commit d82632f

Please sign in to comment.