Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor #331

Merged
merged 22 commits into from
Feb 18, 2025

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Feb 12, 2025

This PR fixes an issue with ConcurrentPerPartitionCursor, where it could fail if the number of generated partitions at the start of the sync exceeded the limit.

Key Changes

  • Removed implicit dependency on ThreadPool limits
  • Moved global cursor switching logic to stream_slices, enabling switching while generating slices
  • Refactored observe and close_partitions methods to update only the global cursor value when global cursor is enabled
  • Added throttling for state emission – now limited to once per minute to fix errors with the platform

@github-actions github-actions bot added the enhancement New feature or request label Feb 12, 2025
Copy link
Contributor

coderabbitai bot commented Feb 12, 2025

📝 Walkthrough

Walkthrough

This pull request refactors the ConcurrentPerPartitionCursor class to update partition management and global cursor handling. It increases the maximum partition limit, introduces a new constant (SWITCH_TO_GLOBAL_LIMIT), and renames an internal variable for clarity. The close_partition method now encapsulates global cursor updates via a new helper method. Additionally, state emission calls have been updated to include a throttling parameter, and the test suite is enhanced with a new test for state throttling along with modifications to an existing test.

Changes

File Path Change Summary
airbyte_cdk/sources/declarative/incremental/.../concurrent_partition_cursor.py - Increased DEFAULT_MAX_PARTITIONS_NUMBER from 10,000 to 25,000
- Added SWITCH_TO_GLOBAL_LIMIT constant
- Renamed _over_limit to _number_of_partitions
- Extracted global cursor update logic into _update_global_cursor
- Updated _emit_state_message to include a throttle parameter and modified ensure_at_least_one_state_emitted to leverage it
unit_tests/sources/declarative/incremental/.../test_concurrent_perpartitioncursor.py - Added test_state_throttling to verify state emission timing
- Modified test_incremental_parent_state to patch _throttle_state_message (bypassing throttling for test purposes)

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Cursor as ConcurrentPerPartitionCursor
    participant StateMgr as StateManager

    Caller->>Cursor: close_partition(record)
    Note right of Cursor: Increment partition count and check limit
    Cursor->>Cursor: _update_global_cursor(record)
    Cursor->>StateMgr: _emit_state_message(throttle flag)
    StateMgr-->>Cursor: Acknowledge state update
Loading

Possibly related PRs

Suggested reviewers

  • aaronsteers – Would you mind taking a look at these changes, aaronsteers? wdyt?
  • maxi297 – Could you review this PR as well, maxi297? wdyt?
✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cb5a921 and 05f4db7.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)

61-63: LGTM! The constant changes improve scalability and readability.

The changes look good:

  1. Increasing DEFAULT_MAX_PARTITIONS_NUMBER to 25,000 allows handling larger datasets
  2. New SWITCH_TO_GLOBAL_LIMIT of 1,000 provides a clear threshold for switching to global cursor
  3. Renaming _over_limit to _number_of_partitions better describes its purpose

Also applies to: 103-103


145-154: LGTM! The refactoring improves code organization.

The changes improve the code by:

  1. Adding a guard clause to prevent unnecessary cursor updates when using global cursor
  2. Extracting the global cursor update logic to a dedicated method

354-373: LGTM! The changes improve observability and code organization.

The changes enhance the code by:

  1. Adding an informative log message when switching to global cursor
  2. Extracting record cursor parsing and update logic for better readability
  3. Simplifying the overall logic flow

375-380: LGTM! The new method follows good design principles.

The new _update_global_cursor method:

  1. Follows single responsibility principle
  2. Has clear and concise update conditions
  3. Uses defensive copying to prevent unintended modifications

413-414: LGTM! The changes make the limit check more explicit.

The changes improve clarity by:

  1. Using the renamed _number_of_partitions variable
  2. Comparing against the new SWITCH_TO_GLOBAL_LIMIT constant

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

61-63: Consider documenting the rationale for the new constants and their relationship.

The changes introduce new constants and rename variables, but their relationship and usage could be clearer. Would you consider:

  1. Updating the class docstring to explain why we switch to global cursor at 1000 partitions?
  2. Adding a comment explaining why we increased the max partitions to 25,000?

Also applies to: 103-103


233-255: Consider improving the partition limit warning messages.

The warning messages now include the number of partitions but could be more actionable. What do you think about:

-            logger.warning(f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}.")
+            logger.warning(
+                f"Maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}) reached. "
+                f"Dropping oldest finished partition: {oldest_partition}. "
+                f"Current partitions: {self._number_of_partitions}. "
+                "Consider adjusting DEFAULT_MAX_PARTITIONS_NUMBER if this is expected."
+            )

375-380: Consider adding type hints to the _update_global_cursor method.

The new method looks good, but would you consider adding type hints to improve code maintainability? Something like:

-    def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
+    def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
+        """Update the global cursor if the new value is greater than the current value.
+        
+        Args:
+            value: The new cursor value to compare against the current global cursor
+        """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 05f4db7 and c0bc645.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'destination-motherduck' (skip=true)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Ruff Lint Check
  • GitHub Check: Build and Inspect Python Package
  • GitHub Check: Ruff Format Check
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: MyPy Check
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: preview_docs
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

354-374: LGTM! Nice improvement to the observe method.

The refactored observe method is more maintainable with clearer separation of concerns. The new logging message when switching to global cursor is particularly helpful.


145-154: Verify the global cursor update logic.

The new condition for updating the global cursor might miss updates if a partition is closed before all its records are processed. Should we add a check to ensure all records have been processed before updating the global cursor? wdyt?

✅ Verification successful

🏁 Script executed:

#!/bin/bash
# Search for any race conditions or similar patterns in cursor updates
rg -A 5 "global_cursor|new_global_cursor" .

Length of output: 49421


Global cursor update logic appears to be functioning as expected.

The current implementation calls _update_global_cursor() during record observation and again when closing a partition—but only after ensuring via the semaphore (i.e. when _semaphore_per_partition[partition_key]._value equals 0) that all concurrent processing for that partition has completed. This check, along with the fact that each observed record already attempts to update the global cursor, means that the design should capture the latest state even if a partition is closed immediately after processing its records.

For example, note that:

  • The semaphore’s value being 0 within the lock guarantees that no other record-processing tasks are active for the partition.
  • The global cursor is updated both in the record observation path and after finalizing the partition’s processing, which provides a safeguard against missing updates.

Would you agree that this dual-update approach meets our safety needs, or do you think an additional explicit check is warranted?

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global cursor logic change looks good to me even though I have a question about the throttling. One thing that is surprising to me is that tests don't change as I would assume that the previous solution would set the global cursor on the slice boundaries while this change would set it on the record cursor value. Am I missing something?

Once we are clear on the tests, I'll approve assuming that limits will be re-worked later so that if you want to release the throttling changes in another PR, you can release these here.

@tolik0
Copy link
Contributor Author

tolik0 commented Feb 13, 2025

@maxi297 The previous implementation set the state as most recent value if present and slices start when it is not present. So the test shouldn't change.

def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Any:
"""
Get the latest time before which all records have been processed.
"""
if not slices:
raise RuntimeError(
"Expected at least one slice but there were none. This is unexpected; please contact Support."
)
merged_intervals = self.merge_intervals(slices)
first_interval = merged_intervals[0]
return first_interval.get("most_recent_cursor_value") or first_interval[self.START_KEY]

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

62-64: Consider documenting the new constants and their relationship.

The relationship between DEFAULT_MAX_PARTITIONS_NUMBER and SWITCH_TO_GLOBAL_LIMIT could be clearer. Would you consider adding docstrings to explain why we switch to global cursor at 10k but allow up to 25k partitions? Also, should we make these configurable via environment variables, wdyt?

Also applies to: 104-108


240-262: Consider improving the logging messages.

The logging messages could be more informative. Would you consider including the partition key in the warning messages to help with debugging? Also, should we add debug-level logs for tracking partition counts?

-            logger.warning(f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}.")
+            logger.warning(f"Maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}) reached. Dropping oldest finished partition with key '{partition_key}': {oldest_partition}. Current count: {self._number_of_partitions}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0bc645 and 52b95e3.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

174-187: LGTM! Nice throttling implementation.

The state emission throttling is a good performance optimization to prevent overwhelming the system with state messages.


382-387: LGTM! Clean extraction of global cursor update logic.

The extraction of _update_global_cursor method improves code organization and reusability.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

393-398: Consider adding type hints to improve code clarity.

The extracted method looks good, but would be even better with explicit type hints. wdyt about:

-    def _update_global_cursor(self, value: Any) -> None:
+    def _update_global_cursor(self, value: Union[str, int, float]) -> None:

371-392: Consider addressing the TODO comment.

The TODO on line 371 suggests checking the number of partitions. Since we now track this with _number_of_partitions, would you like me to help implement this check?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 52b95e3 and 1166a7a.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (10 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4 hunks)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

62-64: Great improvements to partition management constants and variable naming!

The changes make the code more maintainable and clearer:

  • Increased DEFAULT_MAX_PARTITIONS_NUMBER provides more headroom
  • Separate SWITCH_TO_GLOBAL_LIMIT allows independent control
  • Renamed variable better describes its purpose

Also applies to: 104-104


108-108: Well-designed state throttling implementation!

The throttling mechanism helps prevent excessive state emissions while maintaining flexibility:

  • 60-second interval between emissions
  • Ability to bypass throttling when needed
  • Clean separation of throttling logic

Also applies to: 174-189


431-432: Clean update to limit_reached!

The method now correctly uses _number_of_partitions and SWITCH_TO_GLOBAL_LIMIT, aligning with the overall refactoring.

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

2978-3020: Excellent test coverage for state throttling!

The test thoroughly verifies the throttling behavior:

  • Checks that states aren't emitted before 60s
  • Tests multiple time intervals
  • Verifies emission after 60s
  • Uses mocking appropriately

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

62-64: Consider documenting the rationale behind the partition limits.

The changes to partition limits and the addition of state tracking variables look good. However, it would be helpful to document why we chose these specific values for DEFAULT_MAX_PARTITIONS_NUMBER and SWITCH_TO_GLOBAL_LIMIT. This helps future maintainers understand the trade-offs.

     """
     Manages state per partition when a stream has many partitions, preventing data loss or duplication.

     Attributes:
-        DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000).
+        DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 25,000).
+            This limit prevents excessive memory usage while allowing for a reasonable number of concurrent partitions.
+        SWITCH_TO_GLOBAL_LIMIT (int): Threshold at which the cursor switches to global state (default is 10,000).
+            When this limit is reached, the cursor switches to using a global state to maintain performance.

wdyt?

Also applies to: 104-108


174-189: Consider making the throttle interval configurable.

The state throttling implementation looks good, but the 60-second interval is hardcoded. Would you consider making this configurable to allow for different use cases?

+    DEFAULT_STATE_EMISSION_INTERVAL = 60  # seconds
+
     def _throttle_state_message(self) -> Optional[float]:
         """
-        Throttles the state message emission to once every 60 seconds.
+        Throttles the state message emission based on the configured interval.
         """
         current_time = time.time()
-        if current_time - self._last_emission_time <= 60:
+        if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL:
             return None
         return current_time

249-255: Consider adding metrics for partition limit switches.

The implementation of switching to global cursor looks good. However, it might be valuable to track how often this happens. Would you consider adding metrics or structured logging to help monitor this behavior in production?

     if not self._use_global_cursor and self.limit_reached():
+        structured_fields = {
+            "stream_name": self._stream_name,
+            "partition_count": self._number_of_partitions,
+            "limit": self.SWITCH_TO_GLOBAL_LIMIT
+        }
         logger.info(
             f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
-            f"Switching to global cursor for {self._stream_name}."
+            f"Switching to global cursor for {self._stream_name}.",
+            extra=structured_fields
         )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1166a7a and 667700f.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (10 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] 1-1: File would be reformatted. Please format the code using the appropriate tools.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

392-398: LGTM! Clean refactoring of global cursor update logic.

The extraction of global cursor update logic into a separate method improves code organization and readability.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

382-398: 🛠️ Refactor suggestion

Consider adding error handling for cursor value extraction.

The cursor value extraction could fail if the cursor field is missing or has an invalid format. Would you consider adding try-except to handle these cases gracefully? wdyt?

-        record_cursor = self._connector_state_converter.output_format(
-            self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
-        )
-        self._update_global_cursor(record_cursor)
+        try:
+            raw_cursor_value = self._cursor_field.extract_value(record)
+            if raw_cursor_value is None:
+                logger.warning(f"Cursor field '{self.cursor_field.cursor_field_key}' is missing in record")
+                return
+            
+            record_cursor = self._connector_state_converter.output_format(
+                self._connector_state_converter.parse_value(raw_cursor_value)
+            )
+            self._update_global_cursor(record_cursor)
+        except Exception as e:
+            logger.error(
+                f"Failed to extract cursor value from record for field '{self.cursor_field.cursor_field_key}': {str(e)}"
+            )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

174-189: Consider making the throttling interval configurable.

The 60-second throttling interval is currently hardcoded. Would you consider making it configurable through a class constant or constructor parameter? This would allow users to adjust the throttling based on their specific needs. wdyt?

 class ConcurrentPerPartitionCursor(Cursor):
+    DEFAULT_STATE_EMISSION_INTERVAL = 60  # seconds
     
     def __init__(
         self,
         cursor_factory: ConcurrentCursorFactory,
         partition_router: PartitionRouter,
         stream_name: str,
         stream_namespace: Optional[str],
         stream_state: Any,
         message_repository: MessageRepository,
         connector_state_manager: ConnectorStateManager,
         connector_state_converter: AbstractStreamStateConverter,
         cursor_field: CursorField,
+        state_emission_interval: Optional[int] = None,
     ) -> None:
         # ... existing initialization ...
+        self._state_emission_interval = state_emission_interval or self.DEFAULT_STATE_EMISSION_INTERVAL

     def _throttle_state_message(self) -> Optional[float]:
         current_time = time.time()
-        if current_time - self._last_emission_time <= 60:
+        if current_time - self._last_emission_time <= self._state_emission_interval:
             return None
         return current_time

249-279: Consider enhancing the logging messages for better clarity.

The logging messages could be more informative. Would you consider these improvements? wdyt?

-            logger.info(
-                f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
-                f"Switching to global cursor for {self._stream_name}."
-            )
+            logger.info(
+                f"Stream '{self._stream_name}' has {self._number_of_partitions} partitions, "
+                f"exceeding the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
+                "Switching to global cursor for better performance."
+            )

-                            f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
+                            f"Stream '{self._stream_name}' reached maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}). "
+                            f"Dropping oldest finished partition: {oldest_partition}. "
+                            f"Current partitions: {self._number_of_partitions}, Over limit by: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."

-                        f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
+                        f"Stream '{self._stream_name}' reached maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}). "
+                        f"No finished partitions available. Dropping oldest partition: {oldest_partition}. "
+                        f"Current partitions: {self._number_of_partitions}, Over limit by: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 667700f and 6498528.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

62-64: LGTM! Nice improvements to partition handling.

The changes enhance scalability by increasing the maximum partitions and improve code clarity with better naming. The separation between switching to global cursor (10k) and max partitions (25k) provides a good buffer for graceful degradation.

Also applies to: 104-108

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Given the complexity of the ConcurrentPerPartitionCursor solution in general, can we add more details on the PR description to make sure someone that has less knowledge can understand more easily? For example, the PR description mentions that it is enhancing scalability. In which case the current solution is not stable? What does increasing the number of partition does for stability? My understanding is that it avoids cases where fetching for a partition might fail here assuming a partition would take a very long time to process but it would be nice to have this explicit

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

257-279: ⚠️ Potential issue

Consider decrementing _number_of_partitions when removing partitions.

The _number_of_partitions counter is incremented when adding partitions but not decremented when removing them in _ensure_partition_limit. This could lead to inaccurate partition counting over time. Would you consider updating the counter when partitions are removed? wdyt?

     while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
         for partition_key in list(self._cursor_per_partition.keys()):
             if (
                 partition_key in self._finished_partitions
                 and self._semaphore_per_partition[partition_key]._value == 0
             ):
                 oldest_partition = self._cursor_per_partition.pop(partition_key)
+                self._number_of_partitions -= 1
                 logger.warning(
                     f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
                 )
                 break
         else:
             oldest_partition = self._cursor_per_partition.popitem(last=False)[1]
+            self._number_of_partitions -= 1
             logger.warning(
                 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
             )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

62-63: Consider documenting the relationship between the two limits.

The introduction of SWITCH_TO_GLOBAL_LIMIT alongside DEFAULT_MAX_PARTITIONS_NUMBER creates a two-tier system. Would you consider adding docstring comments to explain their relationship and the rationale behind the specific values chosen? This would help future maintainers understand the design decisions. wdyt?

     DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
+    """Maximum number of partitions to retain in memory before enforcing removal."""
     SWITCH_TO_GLOBAL_LIMIT = 10_000
+    """Threshold at which to switch to global cursor for better memory management."""

173-181: Consider making the throttle window configurable.

The 60-second throttle window is currently hardcoded. Would you consider making this configurable through a class constant or constructor parameter? This would provide flexibility for different use cases. wdyt?

 class ConcurrentPerPartitionCursor(Cursor):
+    DEFAULT_THROTTLE_INTERVAL = 60  # seconds
     
     def _throttle_state_message(self) -> Optional[float]:
         current_time = time.time()
-        if current_time - self._last_emission_time <= 60:
+        if current_time - self._last_emission_time <= self.DEFAULT_THROTTLE_INTERVAL:
             return None
         return current_time
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7b4964e and ef8be89.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

392-398: Nice refactoring of the global cursor update logic!

The extraction of _update_global_cursor improves code organization and the comparison logic is robust. The type hints and proper value comparison make the code more maintainable.

@tolik0 tolik0 merged commit 1869fa5 into main Feb 18, 2025
24 of 28 checks passed
@tolik0 tolik0 deleted the tolik0/refactor-concurrent-global-cursor branch February 18, 2025 10:36
rpopov added a commit to rpopov/airbyte-python-cdk that referenced this pull request Mar 5, 2025
* main:
  fix: update cryptography package to latest version to address CVE (airbytehq#377)
  fix: (CDK) (HttpRequester) - Make the `HttpRequester.path` optional (airbytehq#370)
  feat: improved custom components handling (airbytehq#350)
  feat: add microseconds timestamp format (airbytehq#373)
  fix: Replace Unidecode with anyascii for permissive license (airbytehq#367)
  feat: add IncrementingCountCursor (airbytehq#346)
  feat: (low-code cdk)  datetime format with milliseconds (airbytehq#369)
  fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation (airbytehq#368)
  fix: (CDK) (AsyncRetriever) - Add the `request` and `response` to each `async` operations (airbytehq#356)
  fix: (CDK) (ConnectorBuilder) - Add `auxiliary requests` to slice; support `TestRead` for AsyncRetriever (part 1/2) (airbytehq#355)
  feat(concurrent perpartition cursor): Add parent state updates (airbytehq#343)
  fix: update csv parser for builder compatibility (airbytehq#364)
  feat(low-code cdk): add interpolation for limit field in Rate (airbytehq#353)
  feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source (airbytehq#347)
  fix: (CDK) (CsvParser) - Fix the `\\` escaping when passing the `delimiter` from Builder's UI (airbytehq#358)
  feat: expose `str_to_datetime` jinja macro (airbytehq#351)
  fix: update CDK migration for 6.34.0 (airbytehq#348)
  feat: Removes `stream_state` interpolation from CDK (airbytehq#320)
  fix(declarative): Pass `extra_fields` in `global_substream_cursor` (airbytehq#195)
  feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor (airbytehq#331)
  feat(HttpMocker): adding support for PUT requests and bytes responses (airbytehq#342)
  chore: use certified source for manifest-only test (airbytehq#338)
  feat: check for request_option mapping conflicts in individual components (airbytehq#328)
  feat(file-based): sync file acl permissions and identities (airbytehq#260)
  fix: (CDK) (Connector Builder) - refactor the `MessageGrouper` > `TestRead` (airbytehq#332)
  fix(low code): Fix missing cursor for ClientSideIncrementalRecordFilterDecorator (airbytehq#334)
  feat(low-code): Add API Budget (airbytehq#314)
  chore(decoder): clean decoders and make csvdecoder available (airbytehq#326)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants