-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Enable JobTracker limit configurability #405
feat: Enable JobTracker limit configurability #405
Conversation
📝 WalkthroughWalkthroughThis pull request modifies the initialization of the Changes
Sequence Diagram(s)sequenceDiagram
participant SC as SourceConfig
participant M2CF as ModelToComponentFactory
participant JT as JobTracker
participant APR as AsyncJobPartitionRouter
participant MDS as ManifestDeclarativeSource
participant CC as ComponentConstructor
SC->>M2CF: Provide source_config (incl. max_concurrent_job_count)
M2CF->>JT: _create_async_job_tracker(source_config)
Note right of JT: Initialize _limit (min set to 1)
JT-->>M2CF: Return job tracker instance
M2CF->>APR: Initialize router with job tracker
MDS->>CC: streams(config) invoked
CC->>MDS: Retrieve "max_concurrent_async_jobs" from source_config
MDS->>CC: set_max_concurrent_async_jobs(value, config)
Suggested labels
Suggested reviewers
Would these suggestions work for you, wdyt? 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
⏰ Context from checks skipped due to timeout of 90000ms (9)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/async_job/job_tracker.py
(1 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/manifest_declarative_source.py
[error] 148-148: "ModelToComponentFactory" has no attribute "set_max_concurrent_async_jobs" [attr-defined]
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 545-545: Argument "source_config" to "_create_async_job_tracker" of "ModelToComponentFactory" has incompatible type "Mapping[str, Any] | None"; expected "Mapping[str, Any]" [arg-type]
[error] 2938-2938: Argument 3 to "AsyncJobOrchestrator" has incompatible type "JobTracker | None"; expected "JobTracker" [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/async_job/job_tracker.py (1)
20-20
: Ensures job tracker always has a positive limit value.Adding this safeguard ensures that
_limit
never goes below 1, which is a sensible validation step since a job tracker with a limit of 0 or negative wouldn't make any practical sense. This change helps prevent potential issues where an incorrectly configured limit might block all jobs from being processed.What's nice about this approach is that it silently corrects problematic values rather than throwing an exception, making the component more resilient. Good defensive programming!
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1874-1878
: New configuration property for limiting concurrent async jobs.This new optional property allows users to configure the maximum number of concurrent asynchronous jobs, which directly supports the PR objective. The property description clearly indicates that this is often aligned with API limits at the account level.
The implementation follows the established schema patterns and includes proper documentation. Being optional maintains backward compatibility with existing connector configurations.
1906-1910
: Duplicate implementation in DeclarativeSource2.This is the same implementation of
max_concurrent_job_count
for the alternateDeclarativeSource2
class. The duplication is necessary since both classes need this property, and the implementation is consistent between them.Both implementations are properly documented with the same description to maintain consistency.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
45-48
: This new property looks good for configuring async job limits. Is it intended for use with JobTracker?Adding this
max_concurrent_job_count
property to the schema enables configurability for async job throttling, which aligns nicely with the PR objective. The integer type is appropriate for a count limit.airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
96-98
: Looks good, passing the source_config to the factory enables JobTracker initialization.This change correctly passes the source configuration to the ModelToComponentFactory constructor, which will allow it to access configuration values like the max concurrent job count.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
530-530
:⚠️ Potential issueUpdate the source_config parameter type to allow None values.
The parameter signature says
Optional[ConnectionDefinition]
but your implementation in_create_async_job_tracker
requires a non-optionalConnectionDefinition
. This causes a type error when calling the method with a potentially None value.- def _create_async_job_tracker( - self, source_config: ConnectionDefinition - ) -> Optional[JobTracker]: + def _create_async_job_tracker( + self, source_config: Optional[ConnectionDefinition] + ) -> Optional[JobTracker]: + if not source_config: + return None if job_count := source_config.get("max_concurrent_job_count"): return JobTracker(job_count) return None
2933-2934
: 🛠️ Refactor suggestionEnsure JobTracker is always available for AsyncJobOrchestrator.
This fallback case fixes a potential issue where
self._job_tracker
might be None when used in AsyncJobOrchestrator. However, the type checker still flags an error because the type is stillOptional[JobTracker]
. Consider using a null-coalescing approach to make the type checker happy.- self._job_tracker = JobTracker(1) if not self._job_tracker else self._job_tracker + # Ensure we always have a valid JobTracker for AsyncJobOrchestrator + job_tracker_to_use = self._job_tracker or JobTracker(1)Then use
job_tracker_to_use
at line 2939 instead ofself._job_tracker
.
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3229-3237
: Good implementation of _create_async_job_tracker.This method nicely extracts the job count limit from the source configuration and creates a JobTracker with the appropriate limit. However, as mentioned earlier, it should handle None values for source_config.
Also, consider adding a minimum value check for job_count to ensure it's at least 1:
def _create_async_job_tracker( self, source_config: Optional[ConnectionDefinition] ) -> Optional[JobTracker]: if not source_config: return None if job_count := source_config.get("max_concurrent_job_count"): + # Ensure job_count is at least 1 + job_count = max(1, job_count) return JobTracker(job_count) return NoneThis prevents potential issues with invalid job count values.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
96-98
: The source_config parameter is now correctly passed to ModelToComponentFactory.This change enables the factory to access configuration values needed for job tracking limits, aligning with the PR objective of making job count limits configurable.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
506-506
: Good addition of the required import for ConnectionDefinition.The import provides the necessary type for the new source_config parameter.
544-548
: The initialization of _job_tracker looks good.The conditional check ensures that job tracking is only configured when source_config is provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3223-3231
: Consider adding validation for the job count valueThe implementation handles the null case well, but should we also ensure the job count is at least 1? What do you think about adding a simple validation like
max(1, source_config.get("max_concurrent_job_count", 1))
to prevent potential issues with zero or negative values? wdyt?def _create_async_job_tracker( self, source_config: Optional[ConnectionDefinition] ) -> JobTracker: """ Sets up job tracking for async jobs based on limit specified in the source config. """ if source_config: - return JobTracker(source_config.get("max_concurrent_job_count", 1)) + return JobTracker(max(1, source_config.get("max_concurrent_job_count", 1))) return JobTracker(1)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
506-506
: Clean import addition for ConnectionDefinitionGood addition of the ConnectionDefinition import to support the new source_config parameter.
530-530
: Well-structured parameter additionThe new optional parameter for source_config is correctly defined with a default value of None, maintaining backward compatibility.
544-544
: Proper initialization of job_trackerThe job tracker is now initialized using the new method with the source_config parameter, enabling configurability as required.
2933-2933
: JobTracker correctly passed to AsyncJobOrchestratorThe job tracker is properly passed to the AsyncJobOrchestrator, enabling control over concurrent job limits.
Grooming note:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
96-99
: The parameter naming might be inconsistent, and could benefit from validation. WDYT?I notice you're passing
max_concurrent_async_job_count
to the factory but retrieving it asmax_concurrent_job_count
from the source config. This slight naming difference (presence of "async" in parameter vs. absence in config key) might cause confusion during maintenance.Also, there's no validation before passing the value. Should the ModelToComponentFactory handle validation (ensuring positive integers), or would it be better to validate here before passing it?
What do you think about either:
- Aligning the parameter name with the config key to be consistent, or
- Adding a brief comment about the naming difference if it's intentional?
else ModelToComponentFactory( emit_connector_builder_messages, - max_concurrent_async_job_count=source_config.get("max_concurrent_job_count"), + max_concurrent_async_job_count=source_config.get("max_concurrent_job_count"), # Config property name differs slightly from parameter name )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagging @bazarnov to have its input on how we expect to handle this in the Connector Builder and if we should change things regarding this. I don't think it should be a blocker here but I would prefer to consider this earlier than later.
The rest looks good to me!
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! Just a couple of dummy questions from my side @pnilan.
Looks like passing the value from the |
What
Enables source-level job tracking and job count limit for async jobs managed by each stream's
AsyncJobOrchestrator
.max_concurrent_async_job_count
top-level property in manifest and passes toModelToComponentFactory
where aJobTracker
component is instantiated in its conctructor w/ the defined job count value.Recommended Review Order
declarative_component_schema.yaml
manifest_declarative_source.py
model_to_component_factory.py
job_tracker.py
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
Tests
JobTracker
class to validate behavior when initialized with limits less than 1.