-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Async Retriever
add url_requester
#211
Conversation
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Async Retriever
add url_requester
📝 WalkthroughWalkthroughThe pull request introduces enhancements to the Airbyte CDK's declarative source components, focusing on expanding the Changes
Sequence DiagramsequenceDiagram
participant CreationRequester
participant PollingRequester
participant UrlRequester
participant DownloadRetriever
participant AbortRequester
participant DeleteRequester
CreationRequester->>PollingRequester: Initiate job creation
PollingRequester->>PollingRequester: Poll for job status
PollingRequester-->>UrlRequester: Request download URL (if available)
UrlRequester-->>PollingRequester: Return download URL
PollingRequester->>DownloadRetriever: Retrieve report data
PollingRequester->>AbortRequester: Handle job failure
PollingRequester->>DeleteRequester: Send delete job request
Possibly related PRs
Suggested Reviewers
Hey there! 👋 I noticed you've made some interesting changes to the AsyncRetriever. The new Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
47-49
: Consider adding more descriptive docstring for url_requester?The current comment is quite long for an inline comment. Would it make sense to move it to a proper docstring with examples and more details about when to use this optional parameter? wdyt?
- url_requester: Optional[Requester] = ( - None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from - ) + url_requester: Optional[Requester] = None + """Optional requester used when polling_requester provides an ID that requires an additional request to obtain download URLs. + + This is useful in scenarios where the polling response contains an ID that needs to be used + to make a separate request to get the actual download URLs. If not provided, URLs will be + extracted directly from the polling response. + """airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2048-2051
: LGTM! Well-documented url_requester field addition.The new field aligns perfectly with the PR objectives for Amazon Seller Partner integration, allowing URL extraction from polling responses. The implementation follows the established pattern of other requester fields in the AsyncRetriever class.
Would you like to add a code example in the description field to show how this might be used in practice? Something like:
url_requester: type: HttpRequester url_base: "https://api.example.com" path: "/get_download_url/{{ response['reportDocumentId'] }}"wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
[error] 241-241: Incompatible types in assignment (expression has type "Response | None", variable has type "Response")
[error] 242-242: Incompatible types in "yield from" (actual type "MutableMapping[Any, Any]", expected type "str")
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2325-2334
: LGTM! Clean implementation of url_requester initializationThe changes follow the existing pattern for other requesters and properly handle the optional nature of url_requester.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
2975-2979
: LGTM! Well-documented schema for url_requesterThe schema definition is consistent with other requester properties and includes clear documentation.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
740-743
: LGTM! Clean implementation of the FlattenFields transformation.The implementation follows the established pattern of other transformation classes in the codebase. Simple and focused.
750-766
: LGTM! Improved readability of examples.The multi-line formatting of examples makes them easier to read and maintain.
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coderabbit and mypy type checks (for the same code block it looks like) are failing on a type mismatch. I'm good w/ the approach as we discussed last week, but just clarifying a few things for myself.
Approving, but please fix those before merging.
And for guardian, that is a separate unrelated issue and should be fixed on latest anyway so don't worry about that.
…ever-add-url-requester # Conflicts: # airbyte_cdk/sources/declarative/models/declarative_component_schema.py
…ever-add-url-requester # Conflicts: # airbyte_cdk/sources/declarative/requesters/http_job_repository.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2053-2056
: Theurl_requester
addition looks good and aligns with the PR objectives!The implementation properly handles the URL extraction requirement for the Amazon Seller Partner integration. The field is correctly marked as optional with appropriate type hints.
Just wondering - should we add an example in the description to show how it interacts with the polling response? wdyt?
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)
47-49
: Simplifyurl_requester
attribute initialization?I noticed that the parentheses around the
url_requester
assignment might be unnecessary and could be removed for clarity. Additionally, we could consider rephrasing the inline comment for better readability. Wdyt?Here's a suggested change:
- url_requester: Optional[Requester] = ( - None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from - ) + # Use `url_requester` if an extra request is needed to obtain a list of URLs to download when `polling_requester` provides an ID. + url_requester: Optional[Requester] = None
240-245
: Extract method for stream slice creation?Since the creation of
stream_slice
is a repeatable pattern, would it be beneficial to extract this logic into a separate method? This could enhance code readability and maintainability. Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/declarative_component_schema.yaml
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
740-748
: LGTM! TheFlattenFields
transformation looks well-structured.The implementation includes clear documentation and proper type hints. The optional
flatten_lists
parameter provides good flexibility for handling different use cases.airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)
247-247
:⚠️ Potential issueConvert extracted records to strings before yielding?
It seems
urls_extractor.extract_records(url_response)
might return records that are not strings, but the method is expected to yieldIterable[str]
. Should we convert each record to a string before yielding to ensure type consistency? Wdyt?Here's how we might modify the code:
yield from self.urls_extractor.extract_records(url_response) + for record in self.urls_extractor.extract_records(url_response): + yield str(record)Likely invalid or redundant comment.
236-247
:⚠️ Potential issueHandle potential
None
response fromurl_requester.send_request
?Since
send_request
can returnNone
, should we add a check to ensureurl_response
is notNone
before using it? This would prevent possible exceptions when passingNone
toself.urls_extractor.extract_records(url_response)
. Wdyt?Here's a suggested fix:
def _get_download_url(self, job: AsyncJob) -> Iterable[str]: if not self.url_requester: url_response = self._polling_job_response_by_id[job.api_job_id()] else: stream_slice: StreamSlice = StreamSlice( partition={ "polling_job_response": self._polling_job_response_by_id[job.api_job_id()] }, cursor_slice={}, ) - url_response = self.url_requester.send_request(stream_slice=stream_slice) + url_response = self.url_requester.send_request(stream_slice=stream_slice) + if not url_response: + raise AirbyteTracedException( + internal_message="URL Requester received an empty Response.", + failure_type=FailureType.system_error, + ) yield from self.urls_extractor.extract_records(url_response)Likely invalid or redundant comment.
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (3)
47-49
: Consider enhancing the documentation for url_requester?The inline comment is helpful, but what do you think about converting it to a proper docstring? Something like:
url_requester: Optional[Requester] = ( - None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from + None + ) + """Optional requester used when polling_requester provides an identifier (e.g., reportDocumentId) + that requires an additional request to obtain the actual download URLs. + + Example: + Used in scenarios where a reporting API first returns a report ID, + and a separate request is needed to get the actual download URL using that ID. + """wdyt? 🤔
246-246
: Consider improving type safety for url_requester.send_request?The
# type: ignore
comment suggests we could handle types better. What do you think about this approach:- url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore + if not self.url_requester: + raise ValueError("url_requester cannot be None at this point") + url_response = self.url_requester.send_request(stream_slice=stream_slice)This would satisfy the type checker and make the code more robust. wdyt?
248-251
: How about making the error message more specific?The current error message is generic. Consider including more context about what went wrong:
raise AirbyteTracedException( - internal_message="Always expect a response or an exception from url_requester", + internal_message=f"URL requester failed to return a response for job {job.api_job_id()}. This is unexpected as the requester should either return a response or raise an exception.", failure_type=FailureType.system_error, )Would this be more helpful for debugging? 🤔
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)
192-192
: Nice refactoring! 👍Good job extracting the URL retrieval logic into a separate method. This makes the code more modular and easier to maintain.
253-253
: 🛠️ Refactor suggestionShould we validate the extracted URLs?
The current code assumes the extracted records are strings (valid URLs). Maybe we could add some validation? Something like:
- yield from self.urls_extractor.extract_records(url_response) # type: ignore + for record in self.urls_extractor.extract_records(url_response): + if not isinstance(record, str): + raise ValueError(f"Expected URL to be a string, got {type(record)}") + if not record.startswith(('http://', 'https://')): + raise ValueError(f"Invalid URL format: {record}") + yield recordThis would catch issues early and provide better error messages. What are your thoughts on this? 🤔
Likely invalid or redundant comment.
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (4)
34-36
: Consider enhancing the docstring with more details, wdyt?The current docstring could be more helpful if it included:
- A brief description of what AsyncHttpJobRepository does
- An overview of the async job lifecycle
- Documentation for the new url_requester feature
- Example usage
Here's a suggested enhancement:
- """ - See Readme file for more details about flow. - """ + """ + A repository that manages asynchronous HTTP jobs through their lifecycle: + 1. Job Creation: Initiates async jobs via creation_requester + 2. Status Polling: Monitors job status via polling_requester + 3. URL Resolution: Optionally fetches download URLs via url_requester + 4. Data Retrieval: Downloads data via download_retriever + 5. Cleanup: Optional job abortion/deletion via abort_requester/delete_requester + + See README for detailed flow diagram and examples. + """
51-53
: How about improving the url_requester documentation for better clarity?The current inline comment could be more descriptive. Here's a suggestion:
url_requester: Optional[Requester] = ( - None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from + None # Optional requester used when polling_requester returns an identifier (e.g., reportDocumentId) + # that requires an additional request to obtain the actual download URLs. + # Example: Amazon Seller Partner API where report URLs must be fetched separately. )
196-196
: Do we need the empty yield at the end?The
yield from []
at the end of the method appears unnecessary since it doesn't yield any values. We could safely remove it, wdyt?for message in self.download_retriever.read_records({}, stream_slice): # ... message handling ... - yield from []
Also applies to: 219-219
Line range hint
1-257
: Would you consider adding more documentation about the Amazon Seller Partner use case?Since this enhancement was specifically added for the Amazon Seller Partner integration, it might be helpful to:
- Add a docstring to the
_get_download_url
method explaining the two-step URL retrieval process- Include a code example in the class docstring showing how to configure the
url_requester
for the Amazon Seller Partner use caseI see there's a nice mermaid diagram in the README (from the past review comments). Perhaps we could also add a link to it in the docstring? This would help developers understand the flow better.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/requesters/README.md
(1 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(4 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte_cdk/sources/declarative/requesters/README.md
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
What
Resolving https://github.com/airbytehq/airbyte-internal-issues/issues/11144
How
In amazon-seller-partner list of urls to download the report should be obtained from another url:
DONE
:report_document_id
from step1, response:possible solution:
add url_requester to AsyncRetriever
Summary by CodeRabbit
New Features
Improvements
AsyncHttpJobRepository
interactions.Technical Enhancements
AsyncRetriever
capabilities.