Skip to content

Commit

Permalink
🎉 Source GitHub: Use CDK caching and convert PR-related streams to in…
Browse files Browse the repository at this point in the history
…cremental (#7250)

* Source GitHub: Use CDK caching and convert PR-related streams to incremental

* Remove extra change

* Consolidate

* Address comments

* Fix integration test config

* Fix merge

* Update sample state

* Bump release version

* Bump version

* Address feedback

* Bump version

* Fix formatting
  • Loading branch information
cjwooo authored Jan 6, 2022
1 parent 678cfbe commit 5b6b48c
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@
- name: GitHub
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerRepository: airbyte/source-github
dockerImageTag: 0.2.9
dockerImageTag: 0.2.10
documentationUrl: https://docs.airbyte.io/integrations/sources/github
icon: github.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.9
LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ tests:
issue_milestones: ["airbytehq/integration-test", "updated_at"]
issues: ["airbytehq/integration-test", "updated_at"]
projects: ["airbytehq/integration-test", "updated_at"]
pull_request_stats: ["airbytehq/integration-test", "updated_at"]
pull_requests: ["airbytehq/integration-test", "updated_at"]
releases: ["airbytehq/integration-test", "created_at"]
review_comments: ["airbytehq/integration-test", "updated_at"]
reviews: ["airbytehq/integration-test", "submitted_at"]
stargazers: ["airbytehq/integration-test", "starred_at"]
full_refresh:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
"updated_at": "2121-06-28T17:24:51Z"
}
},
"pull_request_stats": {
"airbytehq/integration-test": {
"updated_at": "2121-06-29T02:04:57Z"
}
},
"pull_requests": {
"airbytehq/integration-test": {
"updated_at": "2121-06-28T23:36:35Z"
Expand All @@ -54,6 +59,11 @@
"updated_at": "2121-06-23T23:57:07Z"
}
},
"reviews": {
"airbytehq/integration-test": {
"submitted_at": "2121-06-29T02:04:57Z"
}
},
"stargazers": {
"airbytehq/integration-test": {
"starred_at": "2121-06-29T02:04:57Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,14 @@
"stream": {
"name": "pull_request_stats",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["updated_at"]
},
{
"stream": {
Expand Down Expand Up @@ -257,11 +260,14 @@
"stream": {
"name": "reviews",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["submitted_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["submitted_at"]
},
{
"stream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
"created_at": "2021-06-23T23:57:07Z"
}
},
"pull_request_stats": {
"airbytehq/integration-test": {
"updated_at": "2021-08-30T12:01:15Z"
}
},
"pull_requests": {
"airbytehq/integration-test": {
"updated_at": "2021-06-28T23:36:35Z"
Expand All @@ -53,5 +58,10 @@
"airbytehq/integration-test": {
"created_at": "2021-06-30T10:04:41Z"
}
},
"reviews": {
"airbytehq/integration-test": {
"submitted_at": "2021-08-30T12:01:15Z"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
},
"changed_files": {
"type": ["null", "integer"]
},
"updated_at": {
"type": ["null", "string"],
"format": "date-time"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Organizations(**organization_args),
Projects(**repository_args_with_start_date),
PullRequestCommentReactions(**repository_args_with_start_date),
PullRequestStats(parent=pull_requests_stream, **repository_args),
PullRequestStats(parent=pull_requests_stream, **repository_args_with_start_date),
PullRequests(**repository_args_with_start_date),
Releases(**repository_args_with_start_date),
Repositories(**organization_args),
ReviewComments(**repository_args_with_start_date),
Reviews(parent=pull_requests_stream, **repository_args),
Reviews(parent=pull_requests_stream, **repository_args_with_start_date),
Stargazers(**repository_args_with_start_date),
Tags(**repository_args),
Teams(**organization_args),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,23 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib import parse

import requests
import vcr
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from requests.exceptions import HTTPError
from vcr.cassette import Cassette


def request_cache() -> Cassette:
"""
Builds VCR instance.
It deletes file everytime we create it, normally should be called only once.
We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files.
"""
filename = "request_cache.yml"
try:
os.remove(filename)
except FileNotFoundError:
pass

return vcr.use_cassette(str(filename), record_mode="new_episodes", serializer="yaml")


class GithubStream(HttpStream, ABC):
cache = request_cache()
url_base = "https://api.github.com/"

# To prevent dangerous behavior, the `vcr` library prohibits the use of nested caching.
# Here's an example of dangerous behavior:
# cache = Cassette.use('whatever')
# with cache:
# with cache:
# pass
#
# Therefore, we will only use `cache` for the top-level stream, so as not to cause possible difficulties.
top_level_stream = True

primary_key = "id"
use_cache = True

# GitHub pagination could be from 1 to 100.
page_size = 100
Expand Down Expand Up @@ -100,11 +72,7 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]:

def read_records(self, stream_slice: Mapping[str, any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
try:
if self.top_level_stream:
with self.cache:
yield from super().read_records(stream_slice=stream_slice, **kwargs)
else:
yield from super().read_records(stream_slice=stream_slice, **kwargs)
yield from super().read_records(stream_slice=stream_slice, **kwargs)
except HTTPError as e:
error_msg = str(e)

Expand Down Expand Up @@ -422,6 +390,7 @@ class PullRequests(SemiIncrementalGithubStream):
"""

page_size = 50
first_read_override_key = "first_read_override"

def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand All @@ -431,7 +400,7 @@ def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iter
"""
Decide if this a first read or not by the presence of the state object
"""
self._first_read = not bool(stream_state)
self._first_read = not bool(stream_state) or stream_state.get(self.first_read_override_key, False)
yield from super().read_records(stream_state=stream_state, **kwargs)

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
Expand Down Expand Up @@ -459,7 +428,7 @@ def is_sorted_descending(self) -> bool:
"""
Depending if there any state we read stream in ascending or descending order.
"""
return self._first_read
return not self._first_read


class CommitComments(SemiIncrementalGithubStream):
Expand Down Expand Up @@ -686,23 +655,42 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
# Pull request substreams


class PullRequestSubstream(HttpSubStream, GithubStream, ABC):
top_level_stream = False
class PullRequestSubstream(HttpSubStream, SemiIncrementalGithubStream, ABC):
use_cache = False

def __init__(self, parent: PullRequests, **kwargs):
super().__init__(parent=parent, **kwargs)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
parent_stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state)

"""
Override the parent PullRequests stream configuration to always fetch records in ascending order
"""
parent_state = deepcopy(stream_state) or {}
parent_state[PullRequests.first_read_override_key] = True
parent_stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=parent_state)
for parent_stream_slice in parent_stream_slices:
yield {
"pull_request_number": parent_stream_slice["parent"]["number"],
"repository": parent_stream_slice["parent"]["repository"],
}

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
We've already determined the list of pull requests to run the stream against.
Skip the start_point_map and cursor_field logic in SemiIncrementalGithubStream.read_records.
"""
yield from super(SemiIncrementalGithubStream, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)


class PullRequestStats(PullRequestSubstream):
"""
Expand Down Expand Up @@ -731,19 +719,29 @@ class Reviews(PullRequestSubstream):
API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request
"""

cursor_field = "submitted_at"

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}/reviews"

# Set the parent stream state's cursor field before fetching its records
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
parent_state = deepcopy(stream_state) or {}
for repository in self.repositories:
if repository in parent_state and self.cursor_field in parent_state[repository]:
parent_state[repository][self.parent.cursor_field] = parent_state[repository][self.cursor_field]
yield from super().stream_slices(stream_state=parent_state, **kwargs)


# Reactions streams


class ReactionStream(GithubStream, ABC):

parent_key = "id"
top_level_stream = False
use_cache = False

def __init__(self, **kwargs):
self._stream_kwargs = deepcopy(kwargs)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Your token should have at least the `repo` scope. Depending on which streams you

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.10 | 2021-01-03 | [7250](https://github.com/airbytehq/airbyte/pull/7250) | Use CDK caching and convert PR-related streams to incremental |
| 0.2.9 | 2021-12-29 | [9179](https://github.com/airbytehq/airbyte/pull/9179) | Use default retry delays on server error responses |
| 0.2.8 | 2021-12-07 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description |
| 0.2.7 | 2021-12-06 | [8518](https://github.com/airbytehq/airbyte/pull/8518) | Add connection retry with Github |
Expand Down

0 comments on commit 5b6b48c

Please sign in to comment.