Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Async Retriever add url_requester #211

Merged
merged 8 commits into from
Jan 14, 2025

Conversation

artem1205
Copy link
Contributor

@artem1205 artem1205 commented Jan 9, 2025

What

Resolving https://github.com/airbytehq/airbyte-internal-issues/issues/11144

How

In amazon-seller-partner list of urls to download the report should be obtained from another url:

  1. polling response after status -> DONE:
{
        "reportType": report_name,
        "processingStatus": DONE,
        "reportId": _REPORT_ID,
        "reportDocumentId": report_document_id,
    }
  1. another request to get_url is needed with report_document_id from step1, response:
response_body = {"reportDocumentId": report_document_id, "url": document_download_url}

possible solution:

add url_requester to AsyncRetriever

Summary by CodeRabbit

  • New Features

    • Enhanced asynchronous job retrieval with optional URL requester functionality.
    • Added support for flattening nested data fields.
  • Improvements

    • Updated example formatting for better readability.
    • Improved URL extraction during async job processing.
    • Added sequence diagram for better understanding of AsyncHttpJobRepository interactions.
  • Technical Enhancements

    • Extended AsyncRetriever capabilities.
    • Introduced more flexible URL handling in job repositories.

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@artem1205 artem1205 self-assigned this Jan 9, 2025
@github-actions github-actions bot added the enhancement New feature or request label Jan 9, 2025
@artem1205 artem1205 requested review from maxi297 and brianjlai January 9, 2025 18:31
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@artem1205 artem1205 changed the title feat: POC for url_requester feat: Async Retriever add url_requester Jan 10, 2025
@artem1205 artem1205 marked this pull request as ready for review January 10, 2025 12:15
Copy link
Contributor

coderabbitai bot commented Jan 10, 2025

📝 Walkthrough

Walkthrough

The pull request introduces enhancements to the Airbyte CDK's declarative source components, focusing on expanding the AsyncRetriever functionality. A new url_requester property is added to allow more flexible URL extraction during asynchronous job processing. Additionally, a FlattenFields transformation is introduced, and some code formatting improvements are made to existing components.

Changes

File Change Summary
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added url_requester property to AsyncRetriever definition
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Added FlattenFields class, reformatted KeysReplace examples
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Integrated url_requester in create_async_retriever method
airbyte_cdk/sources/declarative/requesters/http_job_repository.py Added url_requester attribute, new _get_download_url method
airbyte_cdk/sources/declarative/requesters/README.md Added sequence diagram for AsyncHttpJobRepository

Sequence Diagram

sequenceDiagram
    participant CreationRequester
    participant PollingRequester
    participant UrlRequester
    participant DownloadRetriever
    participant AbortRequester
    participant DeleteRequester

    CreationRequester->>PollingRequester: Initiate job creation
    PollingRequester->>PollingRequester: Poll for job status
    PollingRequester-->>UrlRequester: Request download URL (if available)
    UrlRequester-->>PollingRequester: Return download URL
    PollingRequester->>DownloadRetriever: Retrieve report data
    PollingRequester->>AbortRequester: Handle job failure
    PollingRequester->>DeleteRequester: Send delete job request
Loading

Possibly related PRs

Suggested Reviewers

  • maxi297
  • darynaishchenko
  • aldogonzalez8

Hey there! 👋 I noticed you've made some interesting changes to the AsyncRetriever. The new url_requester looks promising - wdyt about adding some inline documentation to explain its usage? Also, the FlattenFields transformation seems like a neat addition. Would you be open to providing a quick example of how it might be used in a config? Cheers! 🚀

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)

47-49: Consider adding more descriptive docstring for url_requester?

The current comment is quite long for an inline comment. Would it make sense to move it to a proper docstring with examples and more details about when to use this optional parameter? wdyt?

-    url_requester: Optional[Requester] = (
-        None  # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
-    )
+    url_requester: Optional[Requester] = None
+    """Optional requester used when polling_requester provides an ID that requires an additional request to obtain download URLs.
+    
+    This is useful in scenarios where the polling response contains an ID that needs to be used
+    to make a separate request to get the actual download URLs. If not provided, URLs will be
+    extracted directly from the polling response.
+    """
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2048-2051: LGTM! Well-documented url_requester field addition.

The new field aligns perfectly with the PR objectives for Amazon Seller Partner integration, allowing URL extraction from polling responses. The implementation follows the established pattern of other requester fields in the AsyncRetriever class.

Would you like to add a code example in the description field to show how this might be used in practice? Something like:

url_requester:
  type: HttpRequester
  url_base: "https://api.example.com"
  path: "/get_download_url/{{ response['reportDocumentId'] }}"

wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6d5ce67 and c641e26.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • airbyte_cdk/sources/declarative/requesters/http_job_repository.py (3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/requesters/http_job_repository.py

[error] 241-241: Incompatible types in assignment (expression has type "Response | None", variable has type "Response")


[error] 242-242: Incompatible types in "yield from" (actual type "MutableMapping[Any, Any]", expected type "str")

🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2325-2334: LGTM! Clean implementation of url_requester initialization

The changes follow the existing pattern for other requesters and properly handle the optional nature of url_requester.

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2975-2979: LGTM! Well-documented schema for url_requester

The schema definition is consistent with other requester properties and includes clear documentation.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

740-743: LGTM! Clean implementation of the FlattenFields transformation.

The implementation follows the established pattern of other transformation classes in the codebase. Simple and focused.


750-766: LGTM! Improved readability of examples.

The multi-line formatting of examples makes them easier to read and maintain.

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coderabbit and mypy type checks (for the same code block it looks like) are failing on a type mismatch. I'm good w/ the approach as we discussed last week, but just clarifying a few things for myself.

Approving, but please fix those before merging.

And for guardian, that is a separate unrelated issue and should be fixed on latest anyway so don't worry about that.

…ever-add-url-requester

# Conflicts:
#	airbyte_cdk/sources/declarative/models/declarative_component_schema.py
…ever-add-url-requester

# Conflicts:
#	airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2053-2056: The url_requester addition looks good and aligns with the PR objectives!

The implementation properly handles the URL extraction requirement for the Amazon Seller Partner integration. The field is correctly marked as optional with appropriate type hints.

Just wondering - should we add an example in the description to show how it interacts with the polling response? wdyt?

airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)

47-49: Simplify url_requester attribute initialization?

I noticed that the parentheses around the url_requester assignment might be unnecessary and could be removed for clarity. Additionally, we could consider rephrasing the inline comment for better readability. Wdyt?

Here's a suggested change:

-    url_requester: Optional[Requester] = (
-        None  # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
-    )
+    # Use `url_requester` if an extra request is needed to obtain a list of URLs to download when `polling_requester` provides an ID.
+    url_requester: Optional[Requester] = None

240-245: Extract method for stream slice creation?

Since the creation of stream_slice is a repeatable pattern, would it be beneficial to extract this logic into a separate method? This could enhance code readability and maintainability. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c641e26 and b72499a.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • airbyte_cdk/sources/declarative/requesters/http_job_repository.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

740-748: LGTM! The FlattenFields transformation looks well-structured.

The implementation includes clear documentation and proper type hints. The optional flatten_lists parameter provides good flexibility for handling different use cases.

airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)

247-247: ⚠️ Potential issue

Convert extracted records to strings before yielding?

It seems urls_extractor.extract_records(url_response) might return records that are not strings, but the method is expected to yield Iterable[str]. Should we convert each record to a string before yielding to ensure type consistency? Wdyt?

Here's how we might modify the code:

         yield from self.urls_extractor.extract_records(url_response)
+        for record in self.urls_extractor.extract_records(url_response):
+            yield str(record)

Likely invalid or redundant comment.


236-247: ⚠️ Potential issue

Handle potential None response from url_requester.send_request?

Since send_request can return None, should we add a check to ensure url_response is not None before using it? This would prevent possible exceptions when passing None to self.urls_extractor.extract_records(url_response). Wdyt?

Here's a suggested fix:

     def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
         if not self.url_requester:
             url_response = self._polling_job_response_by_id[job.api_job_id()]
         else:
             stream_slice: StreamSlice = StreamSlice(
                 partition={
                     "polling_job_response": self._polling_job_response_by_id[job.api_job_id()]
                 },
                 cursor_slice={},
             )
-            url_response = self.url_requester.send_request(stream_slice=stream_slice)
+            url_response = self.url_requester.send_request(stream_slice=stream_slice)
+            if not url_response:
+                raise AirbyteTracedException(
+                    internal_message="URL Requester received an empty Response.",
+                    failure_type=FailureType.system_error,
+                )
         yield from self.urls_extractor.extract_records(url_response)

Likely invalid or redundant comment.

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (3)

47-49: Consider enhancing the documentation for url_requester?

The inline comment is helpful, but what do you think about converting it to a proper docstring? Something like:

    url_requester: Optional[Requester] = (
-        None  # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
+        None
+    )
+    """Optional requester used when polling_requester provides an identifier (e.g., reportDocumentId)
+    that requires an additional request to obtain the actual download URLs.
+    
+    Example:
+        Used in scenarios where a reporting API first returns a report ID,
+        and a separate request is needed to get the actual download URL using that ID.
+    """

wdyt? 🤔


246-246: Consider improving type safety for url_requester.send_request?

The # type: ignore comment suggests we could handle types better. What do you think about this approach:

-            url_response = self.url_requester.send_request(stream_slice=stream_slice)  # type: ignore
+            if not self.url_requester:
+                raise ValueError("url_requester cannot be None at this point")
+            url_response = self.url_requester.send_request(stream_slice=stream_slice)

This would satisfy the type checker and make the code more robust. wdyt?


248-251: How about making the error message more specific?

The current error message is generic. Consider including more context about what went wrong:

                 raise AirbyteTracedException(
-                    internal_message="Always expect a response or an exception from url_requester",
+                    internal_message=f"URL requester failed to return a response for job {job.api_job_id()}. This is unexpected as the requester should either return a response or raise an exception.",
                     failure_type=FailureType.system_error,
                 )

Would this be more helpful for debugging? 🤔

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b72499a and 90efe40.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/requesters/http_job_repository.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)

192-192: Nice refactoring! 👍

Good job extracting the URL retrieval logic into a separate method. This makes the code more modular and easier to maintain.


253-253: 🛠️ Refactor suggestion

Should we validate the extracted URLs?

The current code assumes the extracted records are strings (valid URLs). Maybe we could add some validation? Something like:

-        yield from self.urls_extractor.extract_records(url_response)  # type: ignore
+        for record in self.urls_extractor.extract_records(url_response):
+            if not isinstance(record, str):
+                raise ValueError(f"Expected URL to be a string, got {type(record)}")
+            if not record.startswith(('http://', 'https://')):
+                raise ValueError(f"Invalid URL format: {record}")
+            yield record

This would catch issues early and provide better error messages. What are your thoughts on this? 🤔

Likely invalid or redundant comment.

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (4)

34-36: Consider enhancing the docstring with more details, wdyt?

The current docstring could be more helpful if it included:

  • A brief description of what AsyncHttpJobRepository does
  • An overview of the async job lifecycle
  • Documentation for the new url_requester feature
  • Example usage

Here's a suggested enhancement:

-    """
-    See Readme file for more details about flow.
-    """
+    """
+    A repository that manages asynchronous HTTP jobs through their lifecycle:
+    1. Job Creation: Initiates async jobs via creation_requester
+    2. Status Polling: Monitors job status via polling_requester
+    3. URL Resolution: Optionally fetches download URLs via url_requester
+    4. Data Retrieval: Downloads data via download_retriever
+    5. Cleanup: Optional job abortion/deletion via abort_requester/delete_requester
+
+    See README for detailed flow diagram and examples.
+    """

51-53: How about improving the url_requester documentation for better clarity?

The current inline comment could be more descriptive. Here's a suggestion:

     url_requester: Optional[Requester] = (
-        None  # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
+        None  # Optional requester used when polling_requester returns an identifier (e.g., reportDocumentId)
+             # that requires an additional request to obtain the actual download URLs.
+             # Example: Amazon Seller Partner API where report URLs must be fetched separately.
     )

196-196: Do we need the empty yield at the end?

The yield from [] at the end of the method appears unnecessary since it doesn't yield any values. We could safely remove it, wdyt?

         for message in self.download_retriever.read_records({}, stream_slice):
             # ... message handling ...

-        yield from []

Also applies to: 219-219


Line range hint 1-257: Would you consider adding more documentation about the Amazon Seller Partner use case?

Since this enhancement was specifically added for the Amazon Seller Partner integration, it might be helpful to:

  1. Add a docstring to the _get_download_url method explaining the two-step URL retrieval process
  2. Include a code example in the class docstring showing how to configure the url_requester for the Amazon Seller Partner use case

I see there's a nice mermaid diagram in the README (from the past review comments). Perhaps we could also add a link to it in the docstring? This would help developers understand the flow better.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 90efe40 and d0a7291.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/requesters/README.md (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/http_job_repository.py (4 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/sources/declarative/requesters/README.md
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)

@artem1205 artem1205 merged commit e18e407 into main Jan 14, 2025
19 of 20 checks passed
@artem1205 artem1205 deleted the artem1205/async-retriever-add-url-requester branch January 14, 2025 21:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants