Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AsyncRetriever: Enable configurability of max concurrent job count #391

Closed
wants to merge 8 commits into from

Conversation

pnilan
Copy link
Contributor

@pnilan pnilan commented Mar 6, 2025

Closes: https://github.com/airbytehq/airbyte-internal-issues/issues/11343

What

  • Adds max_concurrent_jobs field to AsyncRetriever component.
    • Can take either an integer or interpolated string
    • Min count of 1
  • Updates and adds tests for interpolation and validation

Summary by CodeRabbit

  • New Features

    • Enhanced job tracking by allowing the job limit to be set dynamically with either a number or a configurable string value.
    • Introduced a new, configurable maximum concurrent jobs setting that lets users fine-tune performance based on their processing requirements.
  • Bug Fixes

    • Added validation to ensure the job limit is at least 1, raising an error if this condition is not met.
  • Tests

    • Updated tests to reflect changes in the JobTracker initialization, ensuring proper handling of new parameters and configurations.

@github-actions github-actions bot added the enhancement New feature or request label Mar 6, 2025
Copy link
Contributor

coderabbitai bot commented Mar 6, 2025

📝 Walkthrough

Walkthrough

This pull request updates the JobTracker class to support a limit parameter that accepts both integers and strings via a new config parameter. The changes include string interpolation for limits, a validation check to ensure the limit is at least 1, and corresponding updates to related tests and instantiation points. Additionally, the schema and model have been enhanced with a new max_concurrent_jobs property to dynamically configure concurrency.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/async_job/job_tracker.py
unit_tests/sources/declarative/async_job/test_integration.py
unit_tests/sources/declarative/async_job/test_job_orchestrator.py
unit_tests/sources/declarative/async_job/test_job_tracker.py
unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py
Updated JobTracker constructor to accept limit as an int or str with an added config parameter. Introduced string interpolation (via InterpolatedString) and validation (limit ≥ 1) along with corresponding test updates.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Added a new property/field max_concurrent_jobs (supports int or str with a default of 1) to enable dynamic configuration of concurrent jobs.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
unit_tests/sources/declarative/test_concurrent_declarative_source.py
Modified JobTracker instantiation to utilize a dynamic value from model.max_concurrent_jobs and updated the manifest configuration accordingly.
unit_tests/sources/declarative/interpolation/test_macros.py Updated expected timestamp outputs in the format_datetime macro tests.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant JobTracker
    participant Interpolator

    Caller->>JobTracker: Instantiate with limit (int|string) and config
    alt Limit is a string
        JobTracker->>Interpolator: Evaluate interpolated limit using config
        Interpolator-->>JobTracker: Return integer limit
    end
    JobTracker->>JobTracker: Validate limit (>= 1)
    JobTracker-->>Caller: Instance created
Loading

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • maxi297
  • brianjlai
  • tolik0
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2371-2376: Great addition of the configurability feature, but there's a small typo in the title.

I noticed there's a misspelling in the title field: "Maximum Conccurent Job Count" should be "Maximum Concurrent Job Count". The implementation looks good otherwise - I like how you've made it accept both integers and string interpolation for flexibility.

What do you think about fixing the typo?

    max_concurrent_jobs: Optional[Union[int, str]] = Field(
        1,
        description="Maximum number of concurrent jobs to run.",
        examples=[2, "{{ config['max_concurrent_jobs'] }}"],
-       title="Maximum Conccurent Job Count",
+       title="Maximum Concurrent Job Count",
    )
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

3380-3381: Typo in property title.

There's a typo in "Maximum Conccurent Job Count" - "Conccurent" should be "Concurrent".

-        title: Maximum Conccurent Job Count
+        title: Maximum Concurrent Job Count

wdyt?

airbyte_cdk/sources/declarative/async_job/job_tracker.py (1)

20-20: Consider adding type annotation for the empty dictionary default.

Using an empty dictionary as a default argument could potentially lead to unexpected behavior if modified. Consider using a more explicit annotation.

-    def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}):
+    def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = None):
+        if config is None:
+            config = {}

wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fe2f9a5 and a73ea65.

📒 Files selected for processing (10)
  • airbyte_cdk/sources/declarative/async_job/job_tracker.py (2 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
  • unit_tests/sources/declarative/async_job/test_integration.py (1 hunks)
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py (7 hunks)
  • unit_tests/sources/declarative/async_job/test_job_tracker.py (2 hunks)
  • unit_tests/sources/declarative/interpolation/test_macros.py (1 hunks)
  • unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (2 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/async_job/job_tracker.py

[error] 3-3: Ruff: Import block is un-sorted or un-formatted. Organize imports.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 2891-2891: Argument 1 to 'JobTracker' has incompatible type 'int | str | None'; expected 'int | str' [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (19)
unit_tests/sources/declarative/interpolation/test_macros.py (2)

78-78: Updated timestamp in expected test output.

Looks like the expected timestamp has been modified from the previous value. This seems like an intentional test data update to match changes in date/time calculations.


84-84: Updated microsecond timestamp in expected test output.

Similar to the previous change, the microsecond timestamp expectation has been updated. This matches the same time value as the seconds timestamp but in microseconds.

unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

352-352: Added max_concurrent_jobs configuration in test manifest.

Good addition! This adds the new max_concurrent_jobs property to the test manifest, ensuring that the AsyncRetriever configuration can properly limit concurrent jobs.

unit_tests/sources/declarative/async_job/test_integration.py (1)

90-90: Updated JobTracker instantiation to include config parameter.

This accommodates the changes to the JobTracker constructor which now accepts a config parameter. The test is properly updated to pass an empty config object.

unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (2)

29-29: Updated JobTracker instantiation with empty config.

Good update to accommodate the new JobTracker constructor parameter.


61-61: Updated second JobTracker instantiation with empty config.

Consistent with the previous update, this ensures all JobTracker instances in tests are created with the new constructor signature.

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

3379-3388: LGTM! Good addition of the configurable max_concurrent_jobs property.

This enhancement allows for dynamic configuration of the concurrency level through both hardcoded integers and string interpolation. The examples and default value provide clear guidance on usage.

airbyte_cdk/sources/declarative/async_job/job_tracker.py (1)

20-26: Solid implementation of string interpolation and validation for limit.

The implementation correctly handles both integer and string limits by using InterpolatedString for evaluation, and includes a validation check to ensure the limit is at least 1. Good defensive programming!

unit_tests/sources/declarative/async_job/test_job_tracker.py (3)

17-20: Good test setup with interpolated string.

The test setup now properly uses the new interpolated string capability, which is good for ensuring the feature works as expected.


46-50: Great test case for string interpolation handling.

This test effectively verifies that a string limit is correctly interpolated and converted to an integer.


53-55: Excellent validation test for the minimum limit.

This test ensures that attempting to use a limit less than 1 properly raises a ValueError, which is important for preventing misconfigurations.

unit_tests/sources/declarative/async_job/test_job_orchestrator.py (8)

140-140: JobTracker instantiation updated with new config parameter

The JobTracker constructor now accepts an additional config parameter, which is initialized with an empty dictionary. This aligns with the PR objective of supporting configurability of max concurrent job count.

Could there be value in adding test cases that specifically exercise the new configurability functionality with non-empty config dictionaries? This would validate that the string interpolation for limits works correctly. Wdyt?


187-187: JobTracker instantiation in helper method updated with new parameter

The _orchestrator helper method has been updated to include the new config={} parameter when creating a default JobTracker. This change maintains consistency with the updated constructor signature.

Since this is a helper method used throughout the test class, this change ensures all test cases will work with the updated JobTracker interface.


195-195: JobTracker instantiation updated with empty config dictionary

Similar to other instances, this JobTracker initialization has been updated to include the new required config parameter.

Have you considered adding a test case that specifically tests the JobTracker with a non-empty config containing a string-based limit or dynamic configuration value? That could help verify the new functionality introduced in this PR. Wdyt?


223-223: JobTracker instantiation updated in exception test

The JobTracker constructor call has been modified to include the config={} parameter in this test for exception handling behavior.

This change maintains compatibility with the updated constructor signature, though it doesn't exercise the new configuration capabilities.


244-244: JobTracker instantiation updated in traced config error test

The JobTracker creation has been updated to include the empty config dictionary in this test case that verifies traced configuration errors.

This change ensures the test continues to work with the updated JobTracker interface.


304-304: JobTracker instantiation updated in max attempts failure test

The JobTracker constructor call has been updated with the new config={} parameter in this test that verifies job budget is freed after maximum retry attempts.

This maintains compatibility with the updated constructor interface.


321-321: JobTracker instantiation updated in budget allocation test

The JobTracker constructor has been modified to include the config={} parameter in this test case that verifies waiting for budget to be freed.

This change ensures the test works with the updated JobTracker signature.


344-344: JobTracker instantiation updated in job start failure test

The JobTracker constructor call has been updated with the config={} parameter in this test that verifies budget is freed when job start raises an exception.

This maintains compatibility with the modified constructor interface.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/async_job/job_tracker.py (2)

19-22: Using a mutable default argument could lead to unexpected behavior - would you consider changing this?

The empty dictionary {} as a default value for config could lead to unexpected behavior if modified within the method, as Python reuses the same dictionary instance across all calls without an explicit config argument.

Also, when creating the InterpolatedString, you're passing an empty parameters dict but then evaluating with the config. Is this intentional or should the parameters come from the config as well? wdyt?

-    def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}):
+    def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = None):
+        if config is None:
+            config = {}
     if isinstance(limit, str):
-        limit = int(InterpolatedString(limit, parameters={}).eval(config=config))
+        limit = int(InterpolatedString(limit).eval(config=config))

19-25: Consider adding error handling for string conversion failures - what do you think?

The current implementation doesn't explicitly handle cases where the string interpolation doesn't result in a valid integer. This could lead to unexpected exceptions when dealing with user-provided configuration values.

     if isinstance(limit, str):
-        limit = int(InterpolatedString(limit, parameters={}).eval(config=config))
+        try:
+            interpolated_value = InterpolatedString(limit).eval(config=config)
+            limit = int(interpolated_value)
+        except (ValueError, TypeError) as e:
+            raise ValueError(f"Could not convert '{limit}' to a valid integer: {str(e)}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a73ea65 and ab227b2.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/async_job/job_tracker.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/async_job/job_tracker.py (3)

6-6: Updated type imports for broader parameter typing - looks good!

Added the Any, Mapping, and Union imports to support the new parameter types in the JobTracker constructor. This aligns well with the changes being made.


9-9: Import of InterpolatedString to support string-based limits - nice addition!

This allows dynamic configuration of the job limit using string interpolation, making the component more flexible.


23-24: Validation check for minimum limit - excellent addition!

This validation ensures that at least one concurrent job can run, preventing configuration errors that could block all progress. The error message is clear and helpful.

@pnilan pnilan requested review from maxi297 and bazarnov March 7, 2025 00:44
Copy link
Contributor

@natikgadzhi natikgadzhi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing stands out as wrong, so approving to unblock.

@@ -3376,6 +3376,16 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
max_concurrent_jobs:
title: Maximum Conccurent Job Count
description: Maximum number of concurrent jobs to run.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it'd be useful to explain what those jobs do — are those parsing already downloaded archives, or are those threads that request multiple reports in parallel? etc

description: Maximum number of concurrent jobs to run.
anyOf:
- type: integer
- type: string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced you really need this in config, and therefore you need strings here. But, up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. Looks like Salesforce defines a static max across all accounts.

@@ -58,7 +58,7 @@ def test_stream_slices_with_parent_slicer():
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
MockAsyncJobRepository(),
stream_slices,
JobTracker(_NO_LIMIT),
JobTracker(_NO_LIMIT, config={}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like JobTracker already has config={} default in the initializer? Could avoid changes here I think?

Copy link
Contributor

@bazarnov bazarnov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks good to me, but I think it would be a good idea to add a dedicated unit test that tests more than one max_concurrent_jobs. For example, we could test 2-3 max_concurrent_jobs to ensure that we process the records correctly. What do you think?

@@ -15,7 +16,13 @@ class ConcurrentJobLimitReached(Exception):


class JobTracker:
def __init__(self, limit: int):
def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, the assignment of config: Mapping[str, Any] = {} is not necessary in this context. Instead, I suggest we instantiate the config object within the __post_init__() method and make this field optional, setting its default value to None. Please share your thoughts on this suggestion.

@pnilan
Copy link
Contributor Author

pnilan commented Mar 11, 2025

Closing in favor of source-scoped configurability: #405

Will take above comments into account on other PR.

@pnilan pnilan closed this Mar 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants