-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor #331
Conversation
…concurrent-global-cursor
…h `_use_global_cursor`
📝 WalkthroughWalkthroughThis pull request refactors the Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Cursor as ConcurrentPerPartitionCursor
participant StateMgr as StateManager
Caller->>Cursor: close_partition(record)
Note right of Cursor: Increment partition count and check limit
Cursor->>Cursor: _update_global_cursor(record)
Cursor->>StateMgr: _emit_state_message(throttle flag)
StateMgr-->>Cursor: Acknowledge state update
Possibly related PRs
Suggested reviewers
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)
61-63
: LGTM! The constant changes improve scalability and readability.The changes look good:
- Increasing
DEFAULT_MAX_PARTITIONS_NUMBER
to 25,000 allows handling larger datasets- New
SWITCH_TO_GLOBAL_LIMIT
of 1,000 provides a clear threshold for switching to global cursor- Renaming
_over_limit
to_number_of_partitions
better describes its purposeAlso applies to: 103-103
145-154
: LGTM! The refactoring improves code organization.The changes improve the code by:
- Adding a guard clause to prevent unnecessary cursor updates when using global cursor
- Extracting the global cursor update logic to a dedicated method
354-373
: LGTM! The changes improve observability and code organization.The changes enhance the code by:
- Adding an informative log message when switching to global cursor
- Extracting record cursor parsing and update logic for better readability
- Simplifying the overall logic flow
375-380
: LGTM! The new method follows good design principles.The new
_update_global_cursor
method:
- Follows single responsibility principle
- Has clear and concise update conditions
- Uses defensive copying to prevent unintended modifications
413-414
: LGTM! The changes make the limit check more explicit.The changes improve clarity by:
- Using the renamed
_number_of_partitions
variable- Comparing against the new
SWITCH_TO_GLOBAL_LIMIT
constant
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
61-63
: Consider documenting the rationale for the new constants and their relationship.The changes introduce new constants and rename variables, but their relationship and usage could be clearer. Would you consider:
- Updating the class docstring to explain why we switch to global cursor at 1000 partitions?
- Adding a comment explaining why we increased the max partitions to 25,000?
Also applies to: 103-103
233-255
: Consider improving the partition limit warning messages.The warning messages now include the number of partitions but could be more actionable. What do you think about:
- logger.warning(f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}.") + logger.warning( + f"Maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}) reached. " + f"Dropping oldest finished partition: {oldest_partition}. " + f"Current partitions: {self._number_of_partitions}. " + "Consider adjusting DEFAULT_MAX_PARTITIONS_NUMBER if this is expected." + )
375-380
: Consider adding type hints to the _update_global_cursor method.The new method looks good, but would you consider adding type hints to improve code maintainability? Something like:
- def _update_global_cursor(self, value: Mapping[str, Any]) -> None: + def _update_global_cursor(self, value: Mapping[str, Any]) -> None: + """Update the global cursor if the new value is greater than the current value. + + Args: + value: The new cursor value to compare against the current global cursor + """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'destination-motherduck' (skip=true)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Ruff Lint Check
- GitHub Check: Build and Inspect Python Package
- GitHub Check: Ruff Format Check
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: MyPy Check
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: preview_docs
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
354-374
: LGTM! Nice improvement to the observe method.The refactored observe method is more maintainable with clearer separation of concerns. The new logging message when switching to global cursor is particularly helpful.
145-154
: Verify the global cursor update logic.The new condition for updating the global cursor might miss updates if a partition is closed before all its records are processed. Should we add a check to ensure all records have been processed before updating the global cursor? wdyt?
✅ Verification successful
🏁 Script executed:
#!/bin/bash # Search for any race conditions or similar patterns in cursor updates rg -A 5 "global_cursor|new_global_cursor" .Length of output: 49421
Global cursor update logic appears to be functioning as expected.
The current implementation calls
_update_global_cursor()
during record observation and again when closing a partition—but only after ensuring via the semaphore (i.e. when_semaphore_per_partition[partition_key]._value
equals 0) that all concurrent processing for that partition has completed. This check, along with the fact that each observed record already attempts to update the global cursor, means that the design should capture the latest state even if a partition is closed immediately after processing its records.For example, note that:
- The semaphore’s value being 0 within the lock guarantees that no other record-processing tasks are active for the partition.
- The global cursor is updated both in the record observation path and after finalizing the partition’s processing, which provides a safeguard against missing updates.
Would you agree that this dual-update approach meets our safety needs, or do you think an additional explicit check is warranted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global cursor logic change looks good to me even though I have a question about the throttling. One thing that is surprising to me is that tests don't change as I would assume that the previous solution would set the global cursor on the slice boundaries while this change would set it on the record cursor value. Am I missing something?
Once we are clear on the tests, I'll approve assuming that limits will be re-worked later so that if you want to release the throttling changes in another PR, you can release these here.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
@maxi297 The previous implementation set the state as most recent value if present and slices start when it is not present. So the test shouldn't change. Lines 53 to 64 in e27cb81
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
62-64
: Consider documenting the new constants and their relationship.The relationship between
DEFAULT_MAX_PARTITIONS_NUMBER
andSWITCH_TO_GLOBAL_LIMIT
could be clearer. Would you consider adding docstrings to explain why we switch to global cursor at 10k but allow up to 25k partitions? Also, should we make these configurable via environment variables, wdyt?Also applies to: 104-108
240-262
: Consider improving the logging messages.The logging messages could be more informative. Would you consider including the partition key in the warning messages to help with debugging? Also, should we add debug-level logs for tracking partition counts?
- logger.warning(f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}.") + logger.warning(f"Maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}) reached. Dropping oldest finished partition with key '{partition_key}': {oldest_partition}. Current count: {self._number_of_partitions}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
174-187
: LGTM! Nice throttling implementation.The state emission throttling is a good performance optimization to prevent overwhelming the system with state messages.
382-387
: LGTM! Clean extraction of global cursor update logic.The extraction of
_update_global_cursor
method improves code organization and reusability.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
393-398
: Consider adding type hints to improve code clarity.The extracted method looks good, but would be even better with explicit type hints. wdyt about:
- def _update_global_cursor(self, value: Any) -> None: + def _update_global_cursor(self, value: Union[str, int, float]) -> None:
371-392
: Consider addressing the TODO comment.The TODO on line 371 suggests checking the number of partitions. Since we now track this with
_number_of_partitions
, would you like me to help implement this check?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(10 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(4 hunks)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
62-64
: Great improvements to partition management constants and variable naming!The changes make the code more maintainable and clearer:
- Increased
DEFAULT_MAX_PARTITIONS_NUMBER
provides more headroom- Separate
SWITCH_TO_GLOBAL_LIMIT
allows independent control- Renamed variable better describes its purpose
Also applies to: 104-104
108-108
: Well-designed state throttling implementation!The throttling mechanism helps prevent excessive state emissions while maintaining flexibility:
- 60-second interval between emissions
- Ability to bypass throttling when needed
- Clean separation of throttling logic
Also applies to: 174-189
431-432
: Clean update to limit_reached!The method now correctly uses
_number_of_partitions
andSWITCH_TO_GLOBAL_LIMIT
, aligning with the overall refactoring.unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
2978-3020
: Excellent test coverage for state throttling!The test thoroughly verifies the throttling behavior:
- Checks that states aren't emitted before 60s
- Tests multiple time intervals
- Verifies emission after 60s
- Uses mocking appropriately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
62-64
: Consider documenting the rationale behind the partition limits.The changes to partition limits and the addition of state tracking variables look good. However, it would be helpful to document why we chose these specific values for
DEFAULT_MAX_PARTITIONS_NUMBER
andSWITCH_TO_GLOBAL_LIMIT
. This helps future maintainers understand the trade-offs.""" Manages state per partition when a stream has many partitions, preventing data loss or duplication. Attributes: - DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). + DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 25,000). + This limit prevents excessive memory usage while allowing for a reasonable number of concurrent partitions. + SWITCH_TO_GLOBAL_LIMIT (int): Threshold at which the cursor switches to global state (default is 10,000). + When this limit is reached, the cursor switches to using a global state to maintain performance.wdyt?
Also applies to: 104-108
174-189
: Consider making the throttle interval configurable.The state throttling implementation looks good, but the 60-second interval is hardcoded. Would you consider making this configurable to allow for different use cases?
+ DEFAULT_STATE_EMISSION_INTERVAL = 60 # seconds + def _throttle_state_message(self) -> Optional[float]: """ - Throttles the state message emission to once every 60 seconds. + Throttles the state message emission based on the configured interval. """ current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL: return None return current_time
249-255
: Consider adding metrics for partition limit switches.The implementation of switching to global cursor looks good. However, it might be valuable to track how often this happens. Would you consider adding metrics or structured logging to help monitor this behavior in production?
if not self._use_global_cursor and self.limit_reached(): + structured_fields = { + "stream_name": self._stream_name, + "partition_count": self._number_of_partitions, + "limit": self.SWITCH_TO_GLOBAL_LIMIT + } logger.info( f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " - f"Switching to global cursor for {self._stream_name}." + f"Switching to global cursor for {self._stream_name}.", + extra=structured_fields )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(10 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[warning] 1-1: File would be reformatted. Please format the code using the appropriate tools.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
392-398
: LGTM! Clean refactoring of global cursor update logic.The extraction of global cursor update logic into a separate method improves code organization and readability.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
382-398
: 🛠️ Refactor suggestionConsider adding error handling for cursor value extraction.
The cursor value extraction could fail if the cursor field is missing or has an invalid format. Would you consider adding try-except to handle these cases gracefully? wdyt?
- record_cursor = self._connector_state_converter.output_format( - self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) - ) - self._update_global_cursor(record_cursor) + try: + raw_cursor_value = self._cursor_field.extract_value(record) + if raw_cursor_value is None: + logger.warning(f"Cursor field '{self.cursor_field.cursor_field_key}' is missing in record") + return + + record_cursor = self._connector_state_converter.output_format( + self._connector_state_converter.parse_value(raw_cursor_value) + ) + self._update_global_cursor(record_cursor) + except Exception as e: + logger.error( + f"Failed to extract cursor value from record for field '{self.cursor_field.cursor_field_key}': {str(e)}" + )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
174-189
: Consider making the throttling interval configurable.The 60-second throttling interval is currently hardcoded. Would you consider making it configurable through a class constant or constructor parameter? This would allow users to adjust the throttling based on their specific needs. wdyt?
class ConcurrentPerPartitionCursor(Cursor): + DEFAULT_STATE_EMISSION_INTERVAL = 60 # seconds def __init__( self, cursor_factory: ConcurrentCursorFactory, partition_router: PartitionRouter, stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: MessageRepository, connector_state_manager: ConnectorStateManager, connector_state_converter: AbstractStreamStateConverter, cursor_field: CursorField, + state_emission_interval: Optional[int] = None, ) -> None: # ... existing initialization ... + self._state_emission_interval = state_emission_interval or self.DEFAULT_STATE_EMISSION_INTERVAL def _throttle_state_message(self) -> Optional[float]: current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= self._state_emission_interval: return None return current_time
249-279
: Consider enhancing the logging messages for better clarity.The logging messages could be more informative. Would you consider these improvements? wdyt?
- logger.info( - f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " - f"Switching to global cursor for {self._stream_name}." - ) + logger.info( + f"Stream '{self._stream_name}' has {self._number_of_partitions} partitions, " + f"exceeding the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " + "Switching to global cursor for better performance." + ) - f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." + f"Stream '{self._stream_name}' reached maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}). " + f"Dropping oldest finished partition: {oldest_partition}. " + f"Current partitions: {self._number_of_partitions}, Over limit by: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." - f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." + f"Stream '{self._stream_name}' reached maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}). " + f"No finished partitions available. Dropping oldest partition: {oldest_partition}. " + f"Current partitions: {self._number_of_partitions}, Over limit by: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
62-64
: LGTM! Nice improvements to partition handling.The changes enhance scalability by increasing the maximum partitions and improve code clarity with better naming. The separation between switching to global cursor (10k) and max partitions (25k) provides a good buffer for graceful degradation.
Also applies to: 104-108
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Given the complexity of the ConcurrentPerPartitionCursor solution in general, can we add more details on the PR description to make sure someone that has less knowledge can understand more easily? For example, the PR description mentions that it is enhancing scalability
. In which case the current solution is not stable? What does increasing the number of partition does for stability? My understanding is that it avoids cases where fetching for a partition might fail here assuming a partition would take a very long time to process but it would be nice to have this explicit
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
257-279
:⚠️ Potential issueConsider decrementing _number_of_partitions when removing partitions.
The
_number_of_partitions
counter is incremented when adding partitions but not decremented when removing them in_ensure_partition_limit
. This could lead to inaccurate partition counting over time. Would you consider updating the counter when partitions are removed? wdyt?while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: for partition_key in list(self._cursor_per_partition.keys()): if ( partition_key in self._finished_partitions and self._semaphore_per_partition[partition_key]._value == 0 ): oldest_partition = self._cursor_per_partition.pop(partition_key) + self._number_of_partitions -= 1 logger.warning( f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." ) break else: oldest_partition = self._cursor_per_partition.popitem(last=False)[1] + self._number_of_partitions -= 1 logger.warning( f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
62-63
: Consider documenting the relationship between the two limits.The introduction of
SWITCH_TO_GLOBAL_LIMIT
alongsideDEFAULT_MAX_PARTITIONS_NUMBER
creates a two-tier system. Would you consider adding docstring comments to explain their relationship and the rationale behind the specific values chosen? This would help future maintainers understand the design decisions. wdyt?DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 + """Maximum number of partitions to retain in memory before enforcing removal.""" SWITCH_TO_GLOBAL_LIMIT = 10_000 + """Threshold at which to switch to global cursor for better memory management."""
173-181
: Consider making the throttle window configurable.The 60-second throttle window is currently hardcoded. Would you consider making this configurable through a class constant or constructor parameter? This would provide flexibility for different use cases. wdyt?
class ConcurrentPerPartitionCursor(Cursor): + DEFAULT_THROTTLE_INTERVAL = 60 # seconds def _throttle_state_message(self) -> Optional[float]: current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= self.DEFAULT_THROTTLE_INTERVAL: return None return current_time
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
392-398
: Nice refactoring of the global cursor update logic! ✨The extraction of
_update_global_cursor
improves code organization and the comparison logic is robust. The type hints and proper value comparison make the code more maintainable.
* main: fix: update cryptography package to latest version to address CVE (airbytehq#377) fix: (CDK) (HttpRequester) - Make the `HttpRequester.path` optional (airbytehq#370) feat: improved custom components handling (airbytehq#350) feat: add microseconds timestamp format (airbytehq#373) fix: Replace Unidecode with anyascii for permissive license (airbytehq#367) feat: add IncrementingCountCursor (airbytehq#346) feat: (low-code cdk) datetime format with milliseconds (airbytehq#369) fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation (airbytehq#368) fix: (CDK) (AsyncRetriever) - Add the `request` and `response` to each `async` operations (airbytehq#356) fix: (CDK) (ConnectorBuilder) - Add `auxiliary requests` to slice; support `TestRead` for AsyncRetriever (part 1/2) (airbytehq#355) feat(concurrent perpartition cursor): Add parent state updates (airbytehq#343) fix: update csv parser for builder compatibility (airbytehq#364) feat(low-code cdk): add interpolation for limit field in Rate (airbytehq#353) feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source (airbytehq#347) fix: (CDK) (CsvParser) - Fix the `\\` escaping when passing the `delimiter` from Builder's UI (airbytehq#358) feat: expose `str_to_datetime` jinja macro (airbytehq#351) fix: update CDK migration for 6.34.0 (airbytehq#348) feat: Removes `stream_state` interpolation from CDK (airbytehq#320) fix(declarative): Pass `extra_fields` in `global_substream_cursor` (airbytehq#195) feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor (airbytehq#331) feat(HttpMocker): adding support for PUT requests and bytes responses (airbytehq#342) chore: use certified source for manifest-only test (airbytehq#338) feat: check for request_option mapping conflicts in individual components (airbytehq#328) feat(file-based): sync file acl permissions and identities (airbytehq#260) fix: (CDK) (Connector Builder) - refactor the `MessageGrouper` > `TestRead` (airbytehq#332) fix(low code): Fix missing cursor for ClientSideIncrementalRecordFilterDecorator (airbytehq#334) feat(low-code): Add API Budget (airbytehq#314) chore(decoder): clean decoders and make csvdecoder available (airbytehq#326)
This PR fixes an issue with ConcurrentPerPartitionCursor, where it could fail if the number of generated partitions at the start of the sync exceeded the limit.
Key Changes
observe
andclose_partitions
methods to update only the global cursor value when global cursor is enabled