Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable JobTracker limit configurability #405

Merged
merged 14 commits into from
Mar 12, 2025

Conversation

pnilan
Copy link
Contributor

@pnilan pnilan commented Mar 10, 2025

What

Enables source-level job tracking and job count limit for async jobs managed by each stream's AsyncJobOrchestrator.

Recommended Review Order

  1. declarative_component_schema.yaml
  2. manifest_declarative_source.py
  3. model_to_component_factory.py
  4. job_tracker.py

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced a new configuration option, "Maximum Concurrent Async Jobs", allowing users to specify the maximum number of asynchronous jobs.
    • Enhanced the handling of job tracking to ensure consistent usage of the same tracker instance across components.
  • Bug Fixes

    • Improved job limit handling by enforcing a minimum value of 1, preventing scenarios with invalid job limits.
  • Tests

    • Added new test coverage for the JobTracker class to validate behavior when initialized with limits less than 1.

@github-actions github-actions bot added the enhancement New feature or request label Mar 10, 2025
Copy link
Contributor

coderabbitai bot commented Mar 10, 2025

📝 Walkthrough

Walkthrough

This pull request modifies the initialization of the _limit attribute in the JobTracker class to ensure a minimum value of 1. It introduces a new property, max_concurrent_async_job_count, to the component schema and configuration models to define the maximum number of concurrent asynchronous jobs. Additionally, the ModelToComponentFactory class is updated to accept a source_config parameter, which is utilized to configure the job tracker and manage job concurrency settings.

Changes

File(s) Change Summary
airbyte_cdk/.../job_tracker.py
airbyte_cdk/.../manifest_declarative_source.py
airbyte_cdk/.../model_to_component_factory.py
Modifies the JobTracker class to enforce a minimum job limit of 1 and updates the ModelToComponentFactory to accept a source_config parameter. This allows for dynamic initialization of the job tracker via a new argument, ensuring consistent usage of the tracker instance across components.
airbyte_cdk/.../declarative_component_schema.yaml
airbyte_cdk/.../declarative_component_schema.py
Introduces a new property/field max_concurrent_async_job_count in the YAML schema and Python config models, specifying the maximum number of concurrent asynchronous jobs, which aligns with external API limitations.

Sequence Diagram(s)

sequenceDiagram
    participant SC as SourceConfig
    participant M2CF as ModelToComponentFactory
    participant JT as JobTracker
    participant APR as AsyncJobPartitionRouter
    participant MDS as ManifestDeclarativeSource
    participant CC as ComponentConstructor

    SC->>M2CF: Provide source_config (incl. max_concurrent_job_count)
    M2CF->>JT: _create_async_job_tracker(source_config)
    Note right of JT: Initialize _limit (min set to 1)
    JT-->>M2CF: Return job tracker instance
    M2CF->>APR: Initialize router with job tracker
    MDS->>CC: streams(config) invoked
    CC->>MDS: Retrieve "max_concurrent_async_jobs" from source_config
    MDS->>CC: set_max_concurrent_async_jobs(value, config)
Loading

Suggested labels

bug

Suggested reviewers

  • brianjlai
  • maxi297

Would these suggestions work for you, wdyt?


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 55625dd and c61583a.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/async_job/job_tracker.py (1 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • airbyte_cdk/sources/declarative/async_job/job_tracker.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
  • GitHub Check: Pytest (Fast)

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 969dec1 and 8a96460.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/async_job/job_tracker.py (1 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/manifest_declarative_source.py

[error] 148-148: "ModelToComponentFactory" has no attribute "set_max_concurrent_async_jobs" [attr-defined]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 545-545: Argument "source_config" to "_create_async_job_tracker" of "ModelToComponentFactory" has incompatible type "Mapping[str, Any] | None"; expected "Mapping[str, Any]" [arg-type]


[error] 2938-2938: Argument 3 to "AsyncJobOrchestrator" has incompatible type "JobTracker | None"; expected "JobTracker" [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/async_job/job_tracker.py (1)

20-20: Ensures job tracker always has a positive limit value.

Adding this safeguard ensures that _limit never goes below 1, which is a sensible validation step since a job tracker with a limit of 0 or negative wouldn't make any practical sense. This change helps prevent potential issues where an incorrectly configured limit might block all jobs from being processed.

What's nice about this approach is that it silently corrects problematic values rather than throwing an exception, making the component more resilient. Good defensive programming!

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

1874-1878: New configuration property for limiting concurrent async jobs.

This new optional property allows users to configure the maximum number of concurrent asynchronous jobs, which directly supports the PR objective. The property description clearly indicates that this is often aligned with API limits at the account level.

The implementation follows the established schema patterns and includes proper documentation. Being optional maintains backward compatibility with existing connector configurations.


1906-1910: Duplicate implementation in DeclarativeSource2.

This is the same implementation of max_concurrent_job_count for the alternate DeclarativeSource2 class. The duplication is necessary since both classes need this property, and the implementation is consistent between them.

Both implementations are properly documented with the same description to maintain consistency.

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

45-48: This new property looks good for configuring async job limits. Is it intended for use with JobTracker?

Adding this max_concurrent_job_count property to the schema enables configurability for async job throttling, which aligns nicely with the PR objective. The integer type is appropriate for a count limit.

airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

96-98: Looks good, passing the source_config to the factory enables JobTracker initialization.

This change correctly passes the source configuration to the ModelToComponentFactory constructor, which will allow it to access configuration values like the max concurrent job count.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

530-530: ⚠️ Potential issue

Update the source_config parameter type to allow None values.

The parameter signature says Optional[ConnectionDefinition] but your implementation in _create_async_job_tracker requires a non-optional ConnectionDefinition. This causes a type error when calling the method with a potentially None value.

-    def _create_async_job_tracker(
-        self, source_config: ConnectionDefinition
-    ) -> Optional[JobTracker]:
+    def _create_async_job_tracker(
+        self, source_config: Optional[ConnectionDefinition]
+    ) -> Optional[JobTracker]:
+        if not source_config:
+            return None
         if job_count := source_config.get("max_concurrent_job_count"):
             return JobTracker(job_count)
         return None

2933-2934: 🛠️ Refactor suggestion

Ensure JobTracker is always available for AsyncJobOrchestrator.

This fallback case fixes a potential issue where self._job_tracker might be None when used in AsyncJobOrchestrator. However, the type checker still flags an error because the type is still Optional[JobTracker]. Consider using a null-coalescing approach to make the type checker happy.

-        self._job_tracker = JobTracker(1) if not self._job_tracker else self._job_tracker
+        # Ensure we always have a valid JobTracker for AsyncJobOrchestrator
+        job_tracker_to_use = self._job_tracker or JobTracker(1)

Then use job_tracker_to_use at line 2939 instead of self._job_tracker.

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3229-3237: Good implementation of _create_async_job_tracker.

This method nicely extracts the job count limit from the source configuration and creates a JobTracker with the appropriate limit. However, as mentioned earlier, it should handle None values for source_config.

Also, consider adding a minimum value check for job_count to ensure it's at least 1:

    def _create_async_job_tracker(
        self, source_config: Optional[ConnectionDefinition]
    ) -> Optional[JobTracker]:
        if not source_config:
            return None
        if job_count := source_config.get("max_concurrent_job_count"):
+            # Ensure job_count is at least 1
+            job_count = max(1, job_count)
            return JobTracker(job_count)
        return None

This prevents potential issues with invalid job count values.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a96460 and f530364.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

96-98: The source_config parameter is now correctly passed to ModelToComponentFactory.

This change enables the factory to access configuration values needed for job tracking limits, aligning with the PR objective of making job count limits configurable.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

506-506: Good addition of the required import for ConnectionDefinition.

The import provides the necessary type for the new source_config parameter.


544-548: The initialization of _job_tracker looks good.

The conditional check ensures that job tracking is only configured when source_config is provided.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3223-3231: Consider adding validation for the job count value

The implementation handles the null case well, but should we also ensure the job count is at least 1? What do you think about adding a simple validation like max(1, source_config.get("max_concurrent_job_count", 1)) to prevent potential issues with zero or negative values? wdyt?

    def _create_async_job_tracker(
        self, source_config: Optional[ConnectionDefinition]
    ) -> JobTracker:
        """
        Sets up job tracking for async jobs based on limit specified in the source config.
        """
        if source_config:
-            return JobTracker(source_config.get("max_concurrent_job_count", 1))
+            return JobTracker(max(1, source_config.get("max_concurrent_job_count", 1)))
        return JobTracker(1)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f530364 and eece374.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

506-506: Clean import addition for ConnectionDefinition

Good addition of the ConnectionDefinition import to support the new source_config parameter.


530-530: Well-structured parameter addition

The new optional parameter for source_config is correctly defined with a default value of None, maintaining backward compatibility.


544-544: Proper initialization of job_tracker

The job tracker is now initialized using the new method with the source_config parameter, enabling configurability as required.


2933-2933: JobTracker correctly passed to AsyncJobOrchestrator

The job tracker is properly passed to the AsyncJobOrchestrator, enabling control over concurrent job limits.

@pnilan
Copy link
Contributor Author

pnilan commented Mar 11, 2025

Grooming note:

  • verify api budget instantiation location

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

96-99: The parameter naming might be inconsistent, and could benefit from validation. WDYT?

I notice you're passing max_concurrent_async_job_count to the factory but retrieving it as max_concurrent_job_count from the source config. This slight naming difference (presence of "async" in parameter vs. absence in config key) might cause confusion during maintenance.

Also, there's no validation before passing the value. Should the ModelToComponentFactory handle validation (ensuring positive integers), or would it be better to validate here before passing it?

What do you think about either:

  1. Aligning the parameter name with the config key to be consistent, or
  2. Adding a brief comment about the naming difference if it's intentional?
 else ModelToComponentFactory(
     emit_connector_builder_messages,
-    max_concurrent_async_job_count=source_config.get("max_concurrent_job_count"),
+    max_concurrent_async_job_count=source_config.get("max_concurrent_job_count"),  # Config property name differs slightly from parameter name
 )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eece374 and c8b4cfc.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)

coderabbitai[bot]

This comment was marked as off-topic.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tagging @bazarnov to have its input on how we expect to handle this in the Connector Builder and if we should change things regarding this. I don't think it should be a blocker here but I would prefer to consider this earlier than later.

The rest looks good to me!

Copy link
Contributor

@bazarnov bazarnov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! Just a couple of dummy questions from my side @pnilan.

@bazarnov
Copy link
Contributor

Tagging @bazarnov to have its input on how we expect to handle this in the Connector Builder and if we should change things regarding this. I don't think it should be a blocker here but I would prefer to consider this earlier than later.

The rest looks good to me!

Looks like passing the value from the self.max_concurrent_jobs to the JobTracker sets the limit correctly, thus should be fine when the max_concurrent_jobs is available in Builder. Since we expect this to be set per source - we should be fine.

@pnilan pnilan enabled auto-merge (squash) March 12, 2025 15:57
@pnilan pnilan disabled auto-merge March 12, 2025 16:26
@pnilan pnilan merged commit 545e2ac into main Mar 12, 2025
24 checks passed
@pnilan pnilan deleted the pnilan/async/configurable-job-tracker-source-level branch March 12, 2025 17:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants