-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: AsyncRetriever: Enable configurability of max concurrent job count #391
Conversation
📝 WalkthroughWalkthroughThis pull request updates the Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant JobTracker
participant Interpolator
Caller->>JobTracker: Instantiate with limit (int|string) and config
alt Limit is a string
JobTracker->>Interpolator: Evaluate interpolated limit using config
Interpolator-->>JobTracker: Return integer limit
end
JobTracker->>JobTracker: Validate limit (>= 1)
JobTracker-->>Caller: Instance created
Possibly related PRs
Suggested labels
Suggested reviewers
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2371-2376
: Great addition of the configurability feature, but there's a small typo in the title.I noticed there's a misspelling in the title field: "Maximum Conccurent Job Count" should be "Maximum Concurrent Job Count". The implementation looks good otherwise - I like how you've made it accept both integers and string interpolation for flexibility.
What do you think about fixing the typo?
max_concurrent_jobs: Optional[Union[int, str]] = Field( 1, description="Maximum number of concurrent jobs to run.", examples=[2, "{{ config['max_concurrent_jobs'] }}"], - title="Maximum Conccurent Job Count", + title="Maximum Concurrent Job Count", )airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
3380-3381
: Typo in property title.There's a typo in "Maximum Conccurent Job Count" - "Conccurent" should be "Concurrent".
- title: Maximum Conccurent Job Count + title: Maximum Concurrent Job Countwdyt?
airbyte_cdk/sources/declarative/async_job/job_tracker.py (1)
20-20
: Consider adding type annotation for the empty dictionary default.Using an empty dictionary as a default argument could potentially lead to unexpected behavior if modified. Consider using a more explicit annotation.
- def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}): + def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = None): + if config is None: + config = {}wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/async_job/job_tracker.py
(2 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)unit_tests/sources/declarative/async_job/test_integration.py
(1 hunks)unit_tests/sources/declarative/async_job/test_job_orchestrator.py
(7 hunks)unit_tests/sources/declarative/async_job/test_job_tracker.py
(2 hunks)unit_tests/sources/declarative/interpolation/test_macros.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py
(2 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/async_job/job_tracker.py
[error] 3-3: Ruff: Import block is un-sorted or un-formatted. Organize imports.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2891-2891: Argument 1 to 'JobTracker' has incompatible type 'int | str | None'; expected 'int | str' [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (19)
unit_tests/sources/declarative/interpolation/test_macros.py (2)
78-78
: Updated timestamp in expected test output.Looks like the expected timestamp has been modified from the previous value. This seems like an intentional test data update to match changes in date/time calculations.
84-84
: Updated microsecond timestamp in expected test output.Similar to the previous change, the microsecond timestamp expectation has been updated. This matches the same time value as the seconds timestamp but in microseconds.
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
352-352
: Added max_concurrent_jobs configuration in test manifest.Good addition! This adds the new
max_concurrent_jobs
property to the test manifest, ensuring that the AsyncRetriever configuration can properly limit concurrent jobs.unit_tests/sources/declarative/async_job/test_integration.py (1)
90-90
: Updated JobTracker instantiation to include config parameter.This accommodates the changes to the JobTracker constructor which now accepts a config parameter. The test is properly updated to pass an empty config object.
unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (2)
29-29
: Updated JobTracker instantiation with empty config.Good update to accommodate the new JobTracker constructor parameter.
61-61
: Updated second JobTracker instantiation with empty config.Consistent with the previous update, this ensures all JobTracker instances in tests are created with the new constructor signature.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
3379-3388
: LGTM! Good addition of the configurable max_concurrent_jobs property.This enhancement allows for dynamic configuration of the concurrency level through both hardcoded integers and string interpolation. The examples and default value provide clear guidance on usage.
airbyte_cdk/sources/declarative/async_job/job_tracker.py (1)
20-26
: Solid implementation of string interpolation and validation for limit.The implementation correctly handles both integer and string limits by using InterpolatedString for evaluation, and includes a validation check to ensure the limit is at least 1. Good defensive programming!
unit_tests/sources/declarative/async_job/test_job_tracker.py (3)
17-20
: Good test setup with interpolated string.The test setup now properly uses the new interpolated string capability, which is good for ensuring the feature works as expected.
46-50
: Great test case for string interpolation handling.This test effectively verifies that a string limit is correctly interpolated and converted to an integer.
53-55
: Excellent validation test for the minimum limit.This test ensures that attempting to use a limit less than 1 properly raises a ValueError, which is important for preventing misconfigurations.
unit_tests/sources/declarative/async_job/test_job_orchestrator.py (8)
140-140
: JobTracker instantiation updated with new config parameterThe JobTracker constructor now accepts an additional
config
parameter, which is initialized with an empty dictionary. This aligns with the PR objective of supporting configurability of max concurrent job count.Could there be value in adding test cases that specifically exercise the new configurability functionality with non-empty config dictionaries? This would validate that the string interpolation for limits works correctly. Wdyt?
187-187
: JobTracker instantiation in helper method updated with new parameterThe
_orchestrator
helper method has been updated to include the newconfig={}
parameter when creating a default JobTracker. This change maintains consistency with the updated constructor signature.Since this is a helper method used throughout the test class, this change ensures all test cases will work with the updated JobTracker interface.
195-195
: JobTracker instantiation updated with empty config dictionarySimilar to other instances, this JobTracker initialization has been updated to include the new required
config
parameter.Have you considered adding a test case that specifically tests the JobTracker with a non-empty config containing a string-based limit or dynamic configuration value? That could help verify the new functionality introduced in this PR. Wdyt?
223-223
: JobTracker instantiation updated in exception testThe JobTracker constructor call has been modified to include the
config={}
parameter in this test for exception handling behavior.This change maintains compatibility with the updated constructor signature, though it doesn't exercise the new configuration capabilities.
244-244
: JobTracker instantiation updated in traced config error testThe JobTracker creation has been updated to include the empty config dictionary in this test case that verifies traced configuration errors.
This change ensures the test continues to work with the updated JobTracker interface.
304-304
: JobTracker instantiation updated in max attempts failure testThe JobTracker constructor call has been updated with the new
config={}
parameter in this test that verifies job budget is freed after maximum retry attempts.This maintains compatibility with the updated constructor interface.
321-321
: JobTracker instantiation updated in budget allocation testThe JobTracker constructor has been modified to include the
config={}
parameter in this test case that verifies waiting for budget to be freed.This change ensures the test works with the updated JobTracker signature.
344-344
: JobTracker instantiation updated in job start failure testThe JobTracker constructor call has been updated with the
config={}
parameter in this test that verifies budget is freed when job start raises an exception.This maintains compatibility with the modified constructor interface.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/async_job/job_tracker.py (2)
19-22
: Using a mutable default argument could lead to unexpected behavior - would you consider changing this?The empty dictionary
{}
as a default value forconfig
could lead to unexpected behavior if modified within the method, as Python reuses the same dictionary instance across all calls without an explicit config argument.Also, when creating the
InterpolatedString
, you're passing an emptyparameters
dict but then evaluating with theconfig
. Is this intentional or should the parameters come from the config as well? wdyt?- def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}): + def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = None): + if config is None: + config = {} if isinstance(limit, str): - limit = int(InterpolatedString(limit, parameters={}).eval(config=config)) + limit = int(InterpolatedString(limit).eval(config=config))
19-25
: Consider adding error handling for string conversion failures - what do you think?The current implementation doesn't explicitly handle cases where the string interpolation doesn't result in a valid integer. This could lead to unexpected exceptions when dealing with user-provided configuration values.
if isinstance(limit, str): - limit = int(InterpolatedString(limit, parameters={}).eval(config=config)) + try: + interpolated_value = InterpolatedString(limit).eval(config=config) + limit = int(interpolated_value) + except (ValueError, TypeError) as e: + raise ValueError(f"Could not convert '{limit}' to a valid integer: {str(e)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/async_job/job_tracker.py
(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/async_job/job_tracker.py (3)
6-6
: Updated type imports for broader parameter typing - looks good!Added the
Any
,Mapping
, andUnion
imports to support the new parameter types in theJobTracker
constructor. This aligns well with the changes being made.
9-9
: Import of InterpolatedString to support string-based limits - nice addition!This allows dynamic configuration of the job limit using string interpolation, making the component more flexible.
23-24
: Validation check for minimum limit - excellent addition!This validation ensures that at least one concurrent job can run, preventing configuration errors that could block all progress. The error message is clear and helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing stands out as wrong, so approving to unblock.
@@ -3376,6 +3376,16 @@ definitions: | |||
- "$ref": "#/definitions/IterableDecoder" | |||
- "$ref": "#/definitions/XmlDecoder" | |||
- "$ref": "#/definitions/ZipfileDecoder" | |||
max_concurrent_jobs: | |||
title: Maximum Conccurent Job Count | |||
description: Maximum number of concurrent jobs to run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it'd be useful to explain what those jobs do — are those parsing already downloaded archives, or are those threads that request multiple reports in parallel? etc
description: Maximum number of concurrent jobs to run. | ||
anyOf: | ||
- type: integer | ||
- type: string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced you really need this in config, and therefore you need strings here. But, up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think you're right. Looks like Salesforce defines a static max across all accounts.
@@ -58,7 +58,7 @@ def test_stream_slices_with_parent_slicer(): | |||
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( | |||
MockAsyncJobRepository(), | |||
stream_slices, | |||
JobTracker(_NO_LIMIT), | |||
JobTracker(_NO_LIMIT, config={}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like JobTracker already has config={} default in the initializer? Could avoid changes here I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything looks good to me, but I think it would be a good idea to add a dedicated unit test that tests more than one max_concurrent_jobs
. For example, we could test 2-3 max_concurrent_jobs
to ensure that we process the records correctly. What do you think?
@@ -15,7 +16,13 @@ class ConcurrentJobLimitReached(Exception): | |||
|
|||
|
|||
class JobTracker: | |||
def __init__(self, limit: int): | |||
def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, the assignment of config: Mapping[str, Any] = {}
is not necessary in this context. Instead, I suggest we instantiate the config
object within the __post_init__()
method and make this field optional, setting its default value to None
. Please share your thoughts on this suggestion.
Closing in favor of source-scoped configurability: #405 Will take above comments into account on other PR. |
Closes: https://github.com/airbytehq/airbyte-internal-issues/issues/11343
What
max_concurrent_jobs
field toAsyncRetriever
component.integer
or interpolated stringSummary by CodeRabbit
New Features
Bug Fixes
Tests
JobTracker
initialization, ensuring proper handling of new parameters and configurations.