From 707a6c66fb6dea5c6b9be0f0c0c8281e1d1b2154 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 5 Feb 2025 19:06:57 +0200 Subject: [PATCH 01/19] Add API Budget --- .../declarative_component_schema.yaml | 166 ++++++++++++++++++ .../models/declarative_component_schema.py | 130 ++++++++++++++ .../parsers/model_to_component_factory.py | 130 +++++++++++++- .../declarative/requesters/http_requester.py | 3 + 4 files changed, 423 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index d51d4c922..ea044f816 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1365,6 +1365,168 @@ definitions: $parameters: type: object additional_properties: true + APIBudget: + title: API Budget + description: Component that defines how many requests can be made to the API in a given time frame. + type: object + required: + - type + properties: + type: + type: string + enum: [APIBudget] + policies: + title: Policies + description: List of policies that define the rate limits for different types of requests. + type: array + items: + anyOf: + - "$ref": "#/definitions/FixedWindowCallRatePolicy" + - "$ref": "#/definitions/MovingWindowCallRatePolicy" + - "$ref": "#/definitions/UnlimitedCallRatePolicy" + ratelimit_reset_header: + title: Rate Limit Reset Header + description: The name of the header that contains the timestamp for when the rate limit will reset. + type: string + default: "ratelimit-reset" + ratelimit_remaining_header: + title: Rate Limit Remaining Header + description: The name of the header that contains the number of remaining requests. + type: string + default: "ratelimit-remaining" + status_codes_for_ratelimit_hit: + title: Status Codes for Rate Limit Hit + description: List of HTTP status codes that indicate a rate limit has been hit. + type: array + items: + type: integer + default: [429] + maximum_attempts_to_acquire: + title: Maximum Attempts to Acquire + description: The maximum number of attempts to acquire a call before giving up. + type: integer + default: 100000 + additionalProperties: true + FixedWindowCallRatePolicy: + title: Fixed Window Call Rate Policy + description: A policy that allows a fixed number of calls within a specific time window. + type: object + required: + - type + - next_reset_ts + - period + - call_limit + - matchers + properties: + type: + type: string + enum: [FixedWindowCallRatePolicy] + next_reset_ts: + title: Next Reset Timestamp + description: The timestamp when the rate limit will reset. + type: string + format: date-time + period: + title: Period + description: The time interval for the rate limit window. + type: string + format: duration + call_limit: + title: Call Limit + description: The maximum number of calls allowed within the period. + type: integer + matchers: + title: Matchers + description: List of matchers that define which requests this policy applies to. + type: array + items: + "$ref": "#/definitions/HttpRequestMatcher" + additionalProperties: true + MovingWindowCallRatePolicy: + title: Moving Window Call Rate Policy + description: A policy that allows a fixed number of calls within a moving time window. + type: object + required: + - type + - rates + - matchers + properties: + type: + type: string + enum: [MovingWindowCallRatePolicy] + rates: + title: Rates + description: List of rates that define the call limits for different time intervals. + type: array + items: + "$ref": "#/definitions/Rate" + matchers: + title: Matchers + description: List of matchers that define which requests this policy applies to. + type: array + items: + "$ref": "#/definitions/HttpRequestMatcher" + additionalProperties: true + UnlimitedCallRatePolicy: + title: Unlimited Call Rate Policy + description: A policy that allows unlimited calls for specific requests. + type: object + required: + - type + - matchers + properties: + type: + type: string + enum: [UnlimitedCallRatePolicy] + matchers: + title: Matchers + description: List of matchers that define which requests this policy applies to. + type: array + items: + "$ref": "#/definitions/HttpRequestMatcher" + additionalProperties: true + Rate: + title: Rate + description: Defines a rate limit with a specific number of calls allowed within a time interval. + type: object + required: + - limit + - interval + properties: + limit: + title: Limit + description: The maximum number of calls allowed within the interval. + type: integer + interval: + title: Interval + description: The time interval for the rate limit. + type: string + format: duration + additionalProperties: true + HttpRequestMatcher: + title: HTTP Request Matcher + description: Matches HTTP requests based on method, URL, parameters, and headers. + type: object + properties: + method: + title: Method + description: The HTTP method to match (e.g., GET, POST). + type: string + url: + title: URL + description: The URL to match. + type: string + params: + title: Parameters + description: The query parameters to match. + type: object + additionalProperties: true + headers: + title: Headers + description: The headers to match. + type: object + additionalProperties: true + additionalProperties: true DefaultErrorHandler: title: Default Error Handler description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff. @@ -1637,6 +1799,10 @@ definitions: - "$ref": "#/definitions/DefaultErrorHandler" - "$ref": "#/definitions/CustomErrorHandler" - "$ref": "#/definitions/CompositeErrorHandler" + api_budget: + title: API Budget + description: Component that defines how many requests can be made to the API in a given time frame. + "$ref": "#/definitions/APIBudget" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 6aa1d35a7..bd5a69f6c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -3,6 +3,7 @@ from __future__ import annotations +from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union @@ -642,6 +643,36 @@ class OAuthAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class Rate(BaseModel): + class Config: + extra = Extra.allow + + limit: int = Field( + ..., + description="The maximum number of calls allowed within the interval.", + title="Limit", + ) + interval: timedelta = Field( + ..., description="The time interval for the rate limit.", title="Interval" + ) + + +class HttpRequestMatcher(BaseModel): + class Config: + extra = Extra.allow + + method: Optional[str] = Field( + None, description="The HTTP method to match (e.g., GET, POST).", title="Method" + ) + url: Optional[str] = Field(None, description="The URL to match.", title="URL") + params: Optional[Dict[str, Any]] = Field( + None, description="The query parameters to match.", title="Parameters" + ) + headers: Optional[Dict[str, Any]] = Field( + None, description="The headers to match.", title="Headers" + ) + + class DpathExtractor(BaseModel): type: Literal["DpathExtractor"] field_path: List[str] = Field( @@ -1578,6 +1609,60 @@ class DatetimeBasedCursor(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class FixedWindowCallRatePolicy(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["FixedWindowCallRatePolicy"] + next_reset_ts: datetime = Field( + ..., + description="The timestamp when the rate limit will reset.", + title="Next Reset Timestamp", + ) + period: timedelta = Field( + ..., description="The time interval for the rate limit window.", title="Period" + ) + call_limit: int = Field( + ..., + description="The maximum number of calls allowed within the period.", + title="Call Limit", + ) + matchers: List[HttpRequestMatcher] = Field( + ..., + description="List of matchers that define which requests this policy applies to.", + title="Matchers", + ) + + +class MovingWindowCallRatePolicy(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["MovingWindowCallRatePolicy"] + rates: List[Rate] = Field( + ..., + description="List of rates that define the call limits for different time intervals.", + title="Rates", + ) + matchers: List[HttpRequestMatcher] = Field( + ..., + description="List of matchers that define which requests this policy applies to.", + title="Matchers", + ) + + +class UnlimitedCallRatePolicy(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["UnlimitedCallRatePolicy"] + matchers: List[HttpRequestMatcher] = Field( + ..., + description="List of matchers that define which requests this policy applies to.", + title="Matchers", + ) + + class DefaultErrorHandler(BaseModel): type: Literal["DefaultErrorHandler"] backoff_strategies: Optional[ @@ -1709,6 +1794,46 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class APIBudget(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["APIBudget"] + policies: Optional[ + List[ + Union[ + FixedWindowCallRatePolicy, + MovingWindowCallRatePolicy, + UnlimitedCallRatePolicy, + ] + ] + ] = Field( + None, + description="List of policies that define the rate limits for different types of requests.", + title="Policies", + ) + ratelimit_reset_header: Optional[str] = Field( + "ratelimit-reset", + description="The name of the header that contains the timestamp for when the rate limit will reset.", + title="Rate Limit Reset Header", + ) + ratelimit_remaining_header: Optional[str] = Field( + "ratelimit-remaining", + description="The name of the header that contains the number of remaining requests.", + title="Rate Limit Remaining Header", + ) + status_codes_for_ratelimit_hit: Optional[List[int]] = Field( + [429], + description="List of HTTP status codes that indicate a rate limit has been hit.", + title="Status Codes for Rate Limit Hit", + ) + maximum_attempts_to_acquire: Optional[int] = Field( + 100000, + description="The maximum number of attempts to acquire a call before giving up.", + title="Maximum Attempts to Acquire", + ) + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -1979,6 +2104,11 @@ class HttpRequester(BaseModel): description="Error handler component that defines how to handle errors.", title="Error Handler", ) + api_budget: Optional[APIBudget] = Field( + None, + description="Component that defines how many requests can be made to the API in a given time frame.", + title="API Budget", + ) http_method: Optional[HttpMethod] = Field( HttpMethod.GET, description="The HTTP method used to fetch data from the source (can be GET or POST).", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b8eeca1ec..cec9aff25 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -112,6 +112,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddFields as AddFieldsModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + APIBudget as APIBudgetModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ApiKeyAuthenticator as ApiKeyAuthenticatorModel, ) @@ -226,6 +229,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + FixedWindowCallRatePolicy as FixedWindowCallRatePolicyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) @@ -241,6 +247,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequestMatcher as HttpRequestMatcherModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, ) @@ -295,6 +304,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( MinMaxDatetime as MinMaxDatetimeModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + MovingWindowCallRatePolicy as MovingWindowCallRatePolicyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( NoAuth as NoAuthModel, ) @@ -313,6 +325,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + Rate as RateModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -356,6 +371,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( TypesMap as TypesMapModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, @@ -469,6 +487,14 @@ MessageRepository, NoopMessageRepository, ) +from airbyte_cdk.sources.streams.call_rate import ( + FixedWindowCallRatePolicy, + HttpAPIBudget, + HttpRequestMatcher, + MovingWindowCallRatePolicy, + Rate, + UnlimitedCallRatePolicy, +) from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, ClampingStrategy, @@ -607,6 +633,12 @@ def _init_mappings(self) -> None: StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, ZipfileDecoderModel: self.create_zipfile_decoder, + APIBudgetModel: self.create_api_budget, + FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy, + MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, + UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, + RateModel: self.create_rate, + HttpRequestMatcherModel: self.create_http_request_matcher, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -813,7 +845,8 @@ def create_legacy_to_per_partition_state_migration( return LegacyToPerPartitionStateMigration( partition_router, # type: ignore # was already checked above - declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams. + declarative_stream.incremental_sync, + # type: ignore # was already checked. Migration can be applied only to incremental streams. config, declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any] ) @@ -1111,7 +1144,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = DayClampingStrategy() end_date_provider = ClampingEndProvider( DayClampingStrategy(is_ceiling=False), - end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(seconds=1), ) case "WEEK": @@ -1128,14 +1162,16 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = WeekClampingStrategy(weekday) end_date_provider = ClampingEndProvider( WeekClampingStrategy(weekday, is_ceiling=False), - end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case "MONTH": clamping_strategy = MonthClampingStrategy() end_date_provider = ClampingEndProvider( MonthClampingStrategy(is_ceiling=False), - end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case _: @@ -1152,8 +1188,10 @@ def create_concurrent_cursor_from_datetime_based_cursor( connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, - start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + start=start_date, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice lookback_window=lookback_window, slice_range=step_length, cursor_granularity=cursor_granularity, @@ -1911,6 +1949,12 @@ def create_http_requester( ) ) + api_budget = ( + self._create_component_from_model(model=model.api_budget, config=config) + if model.api_budget + else None + ) + request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, request_body_json=model.request_body_json, @@ -1931,6 +1975,7 @@ def create_http_requester( path=model.path, authenticator=authenticator, error_handler=error_handler, + api_budget=api_budget, http_method=HttpMethod[model.http_method.value], request_options_provider=request_options_provider, config=config, @@ -2919,3 +2964,76 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: return isinstance(parser.inner_parser, JsonParser) else: return False + + def create_api_budget( + self, model: APIBudgetModel, config: Config, **kwargs: Any + ) -> HttpAPIBudget: + policies = [ + self._create_component_from_model(model=policy, config=config) + for policy in model.policies + ] + + return HttpAPIBudget( + policies=policies, + ratelimit_reset_header=model.ratelimit_reset_header, + ratelimit_remaining_header=model.ratelimit_remaining_header, + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire, + ) + + def create_fixed_window_call_rate_policy( + self, model: FixedWindowCallRatePolicyModel, config: Config, **kwargs: Any + ) -> FixedWindowCallRatePolicy: + matchers = [ + self._create_component_from_model(model=matcher, config=config) + for matcher in model.matchers + ] + return FixedWindowCallRatePolicy( + next_reset_ts=model.next_reset_ts, + period=parse_duration(model.period), + call_limit=model.call_limit, + matchers=matchers, + ) + + def create_moving_window_call_rate_policy( + self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any + ) -> MovingWindowCallRatePolicy: + rates = [ + self._create_component_from_model(model=rate, config=config) for rate in model.rates + ] + matchers = [ + self._create_component_from_model(model=matcher, config=config) + for matcher in model.matchers + ] + return MovingWindowCallRatePolicy( + rates=rates, + matchers=matchers, + ) + + def create_unlimited_call_rate_policy( + self, model: UnlimitedCallRatePolicyModel, config: Config, **kwargs: Any + ) -> UnlimitedCallRatePolicy: + matchers = [ + self._create_component_from_model(model=matcher, config=config) + for matcher in model.matchers + ] + + return UnlimitedCallRatePolicy( + matchers=matchers, + ) + + def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: + return Rate( + limit=model.limit, + interval=model.interval, + ) + + def create_http_request_matcher( + self, model: HttpRequestMatcherModel, config: Config, **kwargs: Any + ) -> HttpRequestMatcher: + return HttpRequestMatcher( + method=model.method, + url=model.url, + params=model.params, + headers=model.headers, + ) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 35d4b0f11..96b6a4365 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -22,6 +22,7 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository +from airbyte_cdk.sources.streams.call_rate import APIBudget from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @@ -55,6 +56,7 @@ class HttpRequester(Requester): http_method: Union[str, HttpMethod] = HttpMethod.GET request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None error_handler: Optional[ErrorHandler] = None + api_budget: Optional[APIBudget] = None disable_retries: bool = False message_repository: MessageRepository = NoopMessageRepository() use_cache: bool = False @@ -91,6 +93,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: name=self.name, logger=self.logger, error_handler=self.error_handler, + api_budget=self.api_budget, authenticator=self._authenticator, use_cache=self.use_cache, backoff_strategy=backoff_strategies, From b6bcdd7aa93e04fb3a81824c99d7b5821dbeffc7 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 6 Feb 2025 20:40:54 +0200 Subject: [PATCH 02/19] Refactor to move api_budget to root level --- .../declarative_component_schema.yaml | 67 ++++++-- .../manifest_declarative_source.py | 4 + .../models/declarative_component_schema.py | 69 ++++++-- .../parsers/model_to_component_factory.py | 81 ++++++--- airbyte_cdk/sources/streams/call_rate.py | 155 ++++++++++-------- 5 files changed, 251 insertions(+), 125 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ea044f816..aa4e2b4df 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -40,6 +40,12 @@ properties: "$ref": "#/definitions/Spec" concurrency_level: "$ref": "#/definitions/ConcurrencyLevel" + api_budget: + title: API Budget + description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams. + anyOf: + - "$ref": "#/definitions/APIBudget" + - "$ref": "#/definitions/HTTPAPIBudget" metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. @@ -794,7 +800,7 @@ definitions: description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month) type: object required: - - target + - target properties: target: title: Target @@ -1367,17 +1373,49 @@ definitions: additional_properties: true APIBudget: title: API Budget - description: Component that defines how many requests can be made to the API in a given time frame. + description: > + A generic API budget configuration that defines the policies (rate limiting rules) + and the maximum number of attempts to acquire a call credit. This budget does not automatically + update itself based on HTTP response headers. type: object required: - type + - policies properties: type: type: string enum: [APIBudget] policies: title: Policies - description: List of policies that define the rate limits for different types of requests. + description: List of call rate policies that define how many calls are allowed. + type: array + items: + anyOf: + - "$ref": "#/definitions/FixedWindowCallRatePolicy" + - "$ref": "#/definitions/MovingWindowCallRatePolicy" + - "$ref": "#/definitions/UnlimitedCallRatePolicy" + maximum_attempts_to_acquire: + title: Maximum Attempts to Acquire + description: The maximum number of attempts to acquire a call before giving up. + type: integer + default: 100000 + additionalProperties: true + HTTPAPIBudget: + title: HTTP API Budget + description: > + An HTTP-specific API budget that extends APIBudget by updating rate limiting information based + on HTTP response headers. It extracts available calls and the next reset timestamp from the HTTP responses. + type: object + required: + - type + - policies + properties: + type: + type: string + enum: [HTTPAPIBudget] + policies: + title: Policies + description: List of call rate policies that define how many calls are allowed. type: array items: anyOf: @@ -1386,12 +1424,12 @@ definitions: - "$ref": "#/definitions/UnlimitedCallRatePolicy" ratelimit_reset_header: title: Rate Limit Reset Header - description: The name of the header that contains the timestamp for when the rate limit will reset. + description: The HTTP response header name that indicates when the rate limit resets. type: string default: "ratelimit-reset" ratelimit_remaining_header: title: Rate Limit Remaining Header - description: The name of the header that contains the number of remaining requests. + description: The HTTP response header name that indicates the number of remaining allowed calls. type: string default: "ratelimit-remaining" status_codes_for_ratelimit_hit: @@ -1505,16 +1543,23 @@ definitions: additionalProperties: true HttpRequestMatcher: title: HTTP Request Matcher - description: Matches HTTP requests based on method, URL, parameters, and headers. + description: > + Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers. + Use `url_base` to specify the scheme and host (without trailing slash) and + `url_path_pattern` to apply a regex to the request path. type: object properties: method: title: Method description: The HTTP method to match (e.g., GET, POST). type: string - url: - title: URL - description: The URL to match. + url_base: + title: URL Base + description: The base URL (scheme and host, e.g. "https://api.example.com") to match. + type: string + url_path_pattern: + title: URL Path Pattern + description: A regular expression pattern to match the URL path. type: string params: title: Parameters @@ -1799,10 +1844,6 @@ definitions: - "$ref": "#/definitions/DefaultErrorHandler" - "$ref": "#/definitions/CustomErrorHandler" - "$ref": "#/definitions/CompositeErrorHandler" - api_budget: - title: API Budget - description: Component that defines how many requests can be made to the API in a given time frame. - "$ref": "#/definitions/APIBudget" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index efc779464..d3afb1396 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -137,6 +137,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._source_config, config ) + api_budget_model = self._source_config.get("api_budget") + if api_budget_model: + self._constructor.set_api_budget(api_budget_model, config) + source_streams = [ self._constructor.create_component( DeclarativeStreamModel, diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index bd5a69f6c..c00e46831 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -664,7 +664,16 @@ class Config: method: Optional[str] = Field( None, description="The HTTP method to match (e.g., GET, POST).", title="Method" ) - url: Optional[str] = Field(None, description="The URL to match.", title="URL") + url_base: Optional[str] = Field( + None, + description='The base URL (scheme and host, e.g. "https://api.example.com") to match.', + title="URL Base", + ) + url_path_pattern: Optional[str] = Field( + None, + description="A regular expression pattern to match the URL path.", + title="URL Path Pattern", + ) params: Optional[Dict[str, Any]] = Field( None, description="The query parameters to match.", title="Parameters" ) @@ -1799,27 +1808,48 @@ class Config: extra = Extra.allow type: Literal["APIBudget"] - policies: Optional[ - List[ - Union[ - FixedWindowCallRatePolicy, - MovingWindowCallRatePolicy, - UnlimitedCallRatePolicy, - ] + policies: List[ + Union[ + FixedWindowCallRatePolicy, + MovingWindowCallRatePolicy, + UnlimitedCallRatePolicy, ] ] = Field( - None, - description="List of policies that define the rate limits for different types of requests.", + ..., + description="List of call rate policies that define how many calls are allowed.", + title="Policies", + ) + maximum_attempts_to_acquire: Optional[int] = Field( + 100000, + description="The maximum number of attempts to acquire a call before giving up.", + title="Maximum Attempts to Acquire", + ) + + +class HTTPAPIBudget(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["HTTPAPIBudget"] + policies: List[ + Union[ + FixedWindowCallRatePolicy, + MovingWindowCallRatePolicy, + UnlimitedCallRatePolicy, + ] + ] = Field( + ..., + description="List of call rate policies that define how many calls are allowed.", title="Policies", ) ratelimit_reset_header: Optional[str] = Field( "ratelimit-reset", - description="The name of the header that contains the timestamp for when the rate limit will reset.", + description="The HTTP response header name that indicates when the rate limit resets.", title="Rate Limit Reset Header", ) ratelimit_remaining_header: Optional[str] = Field( "ratelimit-remaining", - description="The name of the header that contains the number of remaining requests.", + description="The HTTP response header name that indicates the number of remaining allowed calls.", title="Rate Limit Remaining Header", ) status_codes_for_ratelimit_hit: Optional[List[int]] = Field( @@ -1867,6 +1897,11 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None + api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( + None, + description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", + title="API Budget", + ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -1893,6 +1928,11 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None + api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( + None, + description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", + title="API Budget", + ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -2104,11 +2144,6 @@ class HttpRequester(BaseModel): description="Error handler component that defines how to handle errors.", title="Error Handler", ) - api_budget: Optional[APIBudget] = Field( - None, - description="Component that defines how many requests can be made to the API in a given time frame.", - title="API Budget", - ) http_method: Optional[HttpMethod] = Field( HttpMethod.GET, description="The HTTP method used to fetch data from the source (can be GET or POST).", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index cec9aff25..87048a005 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -241,6 +241,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipParser as GzipParserModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HTTPAPIBudget as HTTPAPIBudgetModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) @@ -488,6 +491,7 @@ NoopMessageRepository, ) from airbyte_cdk.sources.streams.call_rate import ( + APIBudget, FixedWindowCallRatePolicy, HttpAPIBudget, HttpRequestMatcher, @@ -546,6 +550,7 @@ def __init__( self._evaluate_log_level(emit_connector_builder_messages) ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() + self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -634,6 +639,7 @@ def _init_mappings(self) -> None: ComponentMappingDefinitionModel: self.create_components_mapping_definition, ZipfileDecoderModel: self.create_zipfile_decoder, APIBudgetModel: self.create_api_budget, + HTTPAPIBudgetModel: self.create_http_api_budget, FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy, MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, @@ -845,8 +851,7 @@ def create_legacy_to_per_partition_state_migration( return LegacyToPerPartitionStateMigration( partition_router, # type: ignore # was already checked above - declarative_stream.incremental_sync, - # type: ignore # was already checked. Migration can be applied only to incremental streams. + declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams. config, declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any] ) @@ -1144,8 +1149,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = DayClampingStrategy() end_date_provider = ClampingEndProvider( DayClampingStrategy(is_ceiling=False), - end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(seconds=1), ) case "WEEK": @@ -1162,16 +1166,14 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = WeekClampingStrategy(weekday) end_date_provider = ClampingEndProvider( WeekClampingStrategy(weekday, is_ceiling=False), - end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case "MONTH": clamping_strategy = MonthClampingStrategy() end_date_provider = ClampingEndProvider( MonthClampingStrategy(is_ceiling=False), - end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case _: @@ -1188,10 +1190,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, - start=start_date, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - end_provider=end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice lookback_window=lookback_window, slice_range=step_length, cursor_granularity=cursor_granularity, @@ -1949,11 +1949,7 @@ def create_http_requester( ) ) - api_budget = ( - self._create_component_from_model(model=model.api_budget, config=config) - if model.api_budget - else None - ) + api_budget = self._api_budget request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, @@ -2965,8 +2961,21 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: else: return False - def create_api_budget( - self, model: APIBudgetModel, config: Config, **kwargs: Any + def create_api_budget(self, model: APIBudgetModel, config: Config, **kwargs: Any) -> APIBudget: + policies = [ + self._create_component_from_model(model=policy, config=config) + for policy in model.policies + ] + + return APIBudget( + policies=policies, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire + if model.maximum_attempts_to_acquire + else 100000, + ) + + def create_http_api_budget( + self, model: HTTPAPIBudgetModel, config: Config, **kwargs: Any ) -> HttpAPIBudget: policies = [ self._create_component_from_model(model=policy, config=config) @@ -2975,10 +2984,18 @@ def create_api_budget( return HttpAPIBudget( policies=policies, - ratelimit_reset_header=model.ratelimit_reset_header, - ratelimit_remaining_header=model.ratelimit_remaining_header, - status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire + if model.maximum_attempts_to_acquire + else 100000, + ratelimit_reset_header=model.ratelimit_reset_header + if model.ratelimit_reset_header + else "ratelimit-reset", + ratelimit_remaining_header=model.ratelimit_remaining_header + if model.ratelimit_remaining_header + else "ratelimit-remaining", + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit + if model.status_codes_for_ratelimit_hit + else (429,), ) def create_fixed_window_call_rate_policy( @@ -3033,7 +3050,23 @@ def create_http_request_matcher( ) -> HttpRequestMatcher: return HttpRequestMatcher( method=model.method, - url=model.url, + url_base=model.url_base, + url_path_pattern=model.url_path_pattern, params=model.params, headers=model.headers, ) + + def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None: + model_str = component_definition.get("type") + if model_str == "APIBudget": + # Annotate model_type as a type that is a subclass of BaseModel + model_type: Union[Type[APIBudgetModel], Type[HTTPAPIBudgetModel]] = APIBudgetModel + elif model_str == "HTTPAPIBudget": + model_type = HTTPAPIBudgetModel + else: + raise ValueError(f"Unknown API Budget type: {model_str}") + + # create_component expects a type[BaseModel] and returns an instance of that model. + self._api_budget = self.create_component( + model_type=model_type, component_definition=component_definition, config=config + ) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 81ebac78e..d25fb9c2b 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -6,10 +6,12 @@ import dataclasses import datetime import logging +import re import time +from dataclasses import InitVar, dataclass, field from datetime import timedelta from threading import RLock -from typing import TYPE_CHECKING, Any, Mapping, Optional +from typing import TYPE_CHECKING, Any, Mapping, Optional, Union from urllib import parse import requests @@ -98,43 +100,55 @@ def __call__(self, request: Any) -> bool: class HttpRequestMatcher(RequestMatcher): - """Simple implementation of RequestMatcher for http requests case""" + """ + Extended RequestMatcher for HTTP requests that supports matching on: + - HTTP method (case-insensitive) + - URL base (scheme + netloc) optionally + - URL path pattern (a regex applied to the path portion of the URL) + - Query parameters (must be present) + - Headers (header names compared case-insensitively) + """ def __init__( self, method: Optional[str] = None, - url: Optional[str] = None, + url_base: Optional[str] = None, + url_path_pattern: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None, ): - """Constructor - - :param method: - :param url: - :param params: - :param headers: """ - self._method = method - self._url = url + :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. + :param url_base: Base URL (scheme://host) that must match. + :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. + :param params: Dictionary of query parameters that must be present in the request. + :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). + """ + self._method = method.upper() if method else None + + # Normalize the url_base if provided: remove trailing slash. + self._url_base = url_base.rstrip("/") if url_base else None + + # Compile the URL path pattern if provided. + self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None + + # Normalize query parameters to strings. self._params = {str(k): str(v) for k, v in (params or {}).items()} - self._headers = {str(k): str(v) for k, v in (headers or {}).items()} + + # Normalize header keys to lowercase. + self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} @staticmethod def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: - """Check that all elements from pattern dict present and have the same values in obj dict - - :param obj: - :param pattern: - :return: - """ + """Check that every key/value in the pattern exists in the object.""" return pattern.items() <= obj.items() def __call__(self, request: Any) -> bool: """ - - :param request: - :return: True if matches the provided request object, False - otherwise + :param request: A requests.Request or requests.PreparedRequest instance. + :return: True if the request matches all provided criteria; False otherwise. """ + # Prepare the request (if needed) and extract the URL details. if isinstance(request, requests.Request): prepared_request = request.prepare() elif isinstance(request, requests.PreparedRequest): @@ -142,21 +156,40 @@ def __call__(self, request: Any) -> bool: else: return False - if self._method is not None: - if prepared_request.method != self._method: + # Check HTTP method. + if self._method is not None and prepared_request.method is not None: + if prepared_request.method.upper() != self._method: return False - if self._url is not None and prepared_request.url is not None: - url_without_params = prepared_request.url.split("?")[0] - if url_without_params != self._url: + + # Parse the URL. + parsed_url = parse.urlsplit(prepared_request.url) + # Reconstruct the base: scheme://netloc + request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" + # The path (without query parameters) + request_path = str(parsed_url.path).rstrip("/") + + # If a base URL is provided, check that it matches. + if self._url_base is not None: + if request_url_base != self._url_base: return False - if self._params is not None: - parsed_url = parse.urlsplit(prepared_request.url) - params = dict(parse.parse_qsl(str(parsed_url.query))) - if not self._match_dict(params, self._params): + + # If a URL path pattern is provided, ensure the path matches the regex. + if self._url_path_pattern is not None: + if not self._url_path_pattern.search(request_path): return False - if self._headers is not None: - if not self._match_dict(prepared_request.headers, self._headers): + + # Check query parameters. + if self._params: + query_params = dict(parse.parse_qsl(str(parsed_url.query))) + if not self._match_dict(query_params, self._params): return False + + # Check headers (normalize keys to lower-case). + if self._headers: + req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} + if not self._match_dict(req_headers, self._headers): + return False + return True @@ -399,24 +432,17 @@ def update_from_response(self, request: Any, response: Any) -> None: """ +@dataclass class APIBudget(AbstractAPIBudget): - """Default APIBudget implementation""" - - def __init__( - self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 - ) -> None: - """Constructor - - :param policies: list of policies in this budget - :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here - to avoid situations when many threads compete with each other for a few lots over a significant amount of time - """ + """ + Default APIBudget implementation. + """ - self._policies = policies - self._maximum_attempts_to_acquire = maximum_attempts_to_acquire + policies: list[AbstractCallRatePolicy] + maximum_attempts_to_acquire: int = 100000 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: - for policy in self._policies: + for policy in self.policies: if policy.matches(request): return policy return None @@ -437,7 +463,7 @@ def acquire_call( policy = self.get_matching_policy(request) if policy: self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) - elif self._policies: + elif self.policies: logger.info("no policies matched with requests, allow call by default") def update_from_response(self, request: Any, response: Any) -> None: @@ -460,7 +486,7 @@ def _do_acquire( """ last_exception = None # sometimes we spend all budget before a second attempt, so we have few more here - for attempt in range(1, self._maximum_attempts_to_acquire): + for attempt in range(1, self.maximum_attempts_to_acquire): try: policy.try_acquire(request, weight=1) return @@ -484,31 +510,18 @@ def _do_acquire( if last_exception: logger.info( - "we used all %s attempts to acquire and failed", self._maximum_attempts_to_acquire + "we used all %s attempts to acquire and failed", self.maximum_attempts_to_acquire ) raise last_exception +@dataclass class HttpAPIBudget(APIBudget): """Implementation of AbstractAPIBudget for HTTP""" - def __init__( - self, - ratelimit_reset_header: str = "ratelimit-reset", - ratelimit_remaining_header: str = "ratelimit-remaining", - status_codes_for_ratelimit_hit: tuple[int] = (429,), - **kwargs: Any, - ): - """Constructor - - :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget - :param ratelimit_remaining_header: name of the header that has the number of calls left - :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit - """ - self._ratelimit_reset_header = ratelimit_reset_header - self._ratelimit_remaining_header = ratelimit_remaining_header - self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit - super().__init__(**kwargs) + ratelimit_reset_header: str = "ratelimit-reset" + ratelimit_remaining_header: str = "ratelimit-remaining" + status_codes_for_ratelimit_hit: Union[tuple[int], list[int]] = (429,) def update_from_response(self, request: Any, response: Any) -> None: policy = self.get_matching_policy(request) @@ -523,17 +536,17 @@ def update_from_response(self, request: Any, response: Any) -> None: def get_reset_ts_from_response( self, response: requests.Response ) -> Optional[datetime.datetime]: - if response.headers.get(self._ratelimit_reset_header): + if response.headers.get(self.ratelimit_reset_header): return datetime.datetime.fromtimestamp( - int(response.headers[self._ratelimit_reset_header]) + int(response.headers[self.ratelimit_reset_header]) ) return None def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: - if response.headers.get(self._ratelimit_remaining_header): - return int(response.headers[self._ratelimit_remaining_header]) + if response.headers.get(self.ratelimit_remaining_header): + return int(response.headers[self.ratelimit_remaining_header]) - if response.status_code in self._status_codes_for_ratelimit_hit: + if response.status_code in self.status_codes_for_ratelimit_hit: return 0 return None From 040ff9e5ec97af3fd7e56bf18fb46a5e70273153 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 6 Feb 2025 20:46:27 +0200 Subject: [PATCH 03/19] Format --- .../parsers/model_to_component_factory.py | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 87048a005..0ae7e9572 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2969,9 +2969,7 @@ def create_api_budget(self, model: APIBudgetModel, config: Config, **kwargs: Any return APIBudget( policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire - if model.maximum_attempts_to_acquire - else 100000, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, ) def create_http_api_budget( @@ -2984,18 +2982,10 @@ def create_http_api_budget( return HttpAPIBudget( policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire - if model.maximum_attempts_to_acquire - else 100000, - ratelimit_reset_header=model.ratelimit_reset_header - if model.ratelimit_reset_header - else "ratelimit-reset", - ratelimit_remaining_header=model.ratelimit_remaining_header - if model.ratelimit_remaining_header - else "ratelimit-remaining", - status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit - if model.status_codes_for_ratelimit_hit - else (429,), + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, + ratelimit_reset_header=model.ratelimit_reset_header or "ratelimit-reset", + ratelimit_remaining_header=model.ratelimit_remaining_header or "ratelimit-remaining", + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or (429,), ) def create_fixed_window_call_rate_policy( From 15f830ca5be3ad69cc8065a5de43098d0a1ab110 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 7 Feb 2025 17:43:53 +0200 Subject: [PATCH 04/19] Update for backward compatibility --- .../declarative_component_schema.yaml | 8 +- .../models/declarative_component_schema.py | 8 +- .../parsers/model_to_component_factory.py | 12 +-- airbyte_cdk/sources/streams/call_rate.py | 63 +++++++++++++ unit_tests/sources/streams/test_call_rate.py | 88 +++++++++++++++++++ 5 files changed, 165 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index abcddf514..25c9492fb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1478,7 +1478,7 @@ definitions: description: List of matchers that define which requests this policy applies to. type: array items: - "$ref": "#/definitions/HttpRequestMatcher" + "$ref": "#/definitions/HttpRequestRegexMatcher" additionalProperties: true MovingWindowCallRatePolicy: title: Moving Window Call Rate Policy @@ -1503,7 +1503,7 @@ definitions: description: List of matchers that define which requests this policy applies to. type: array items: - "$ref": "#/definitions/HttpRequestMatcher" + "$ref": "#/definitions/HttpRequestRegexMatcher" additionalProperties: true UnlimitedCallRatePolicy: title: Unlimited Call Rate Policy @@ -1521,7 +1521,7 @@ definitions: description: List of matchers that define which requests this policy applies to. type: array items: - "$ref": "#/definitions/HttpRequestMatcher" + "$ref": "#/definitions/HttpRequestRegexMatcher" additionalProperties: true Rate: title: Rate @@ -1541,7 +1541,7 @@ definitions: type: string format: duration additionalProperties: true - HttpRequestMatcher: + HttpRequestRegexMatcher: title: HTTP Request Matcher description: > Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5bd0aa80d..aaff67548 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -657,7 +657,7 @@ class Config: ) -class HttpRequestMatcher(BaseModel): +class HttpRequestRegexMatcher(BaseModel): class Config: extra = Extra.allow @@ -1642,7 +1642,7 @@ class Config: description="The maximum number of calls allowed within the period.", title="Call Limit", ) - matchers: List[HttpRequestMatcher] = Field( + matchers: List[HttpRequestRegexMatcher] = Field( ..., description="List of matchers that define which requests this policy applies to.", title="Matchers", @@ -1659,7 +1659,7 @@ class Config: description="List of rates that define the call limits for different time intervals.", title="Rates", ) - matchers: List[HttpRequestMatcher] = Field( + matchers: List[HttpRequestRegexMatcher] = Field( ..., description="List of matchers that define which requests this policy applies to.", title="Matchers", @@ -1671,7 +1671,7 @@ class Config: extra = Extra.allow type: Literal["UnlimitedCallRatePolicy"] - matchers: List[HttpRequestMatcher] = Field( + matchers: List[HttpRequestRegexMatcher] = Field( ..., description="List of matchers that define which requests this policy applies to.", title="Matchers", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6f3f39604..9bd775a4a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -251,7 +251,7 @@ HttpRequester as HttpRequesterModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HttpRequestMatcher as HttpRequestMatcherModel, + HttpRequestRegexMatcher as HttpRequestRegexMatcherModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, @@ -494,7 +494,7 @@ APIBudget, FixedWindowCallRatePolicy, HttpAPIBudget, - HttpRequestMatcher, + HttpRequestRegexMatcher, MovingWindowCallRatePolicy, Rate, UnlimitedCallRatePolicy, @@ -644,7 +644,7 @@ def _init_mappings(self) -> None: MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, RateModel: self.create_rate, - HttpRequestMatcherModel: self.create_http_request_matcher, + HttpRequestRegexMatcherModel: self.create_http_request_matcher, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -3040,9 +3040,9 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: ) def create_http_request_matcher( - self, model: HttpRequestMatcherModel, config: Config, **kwargs: Any - ) -> HttpRequestMatcher: - return HttpRequestMatcher( + self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any + ) -> HttpRequestRegexMatcher: + return HttpRequestRegexMatcher( method=model.method, url_base=model.url_base, url_path_pattern=model.url_path_pattern, diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index d25fb9c2b..21fec881f 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -100,6 +100,69 @@ def __call__(self, request: Any) -> bool: class HttpRequestMatcher(RequestMatcher): + """Simple implementation of RequestMatcher for http requests case""" + + def __init__( + self, + method: Optional[str] = None, + url: Optional[str] = None, + params: Optional[Mapping[str, Any]] = None, + headers: Optional[Mapping[str, Any]] = None, + ): + """Constructor + + :param method: + :param url: + :param params: + :param headers: + """ + self._method = method + self._url = url + self._params = {str(k): str(v) for k, v in (params or {}).items()} + self._headers = {str(k): str(v) for k, v in (headers or {}).items()} + + @staticmethod + def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: + """Check that all elements from pattern dict present and have the same values in obj dict + + :param obj: + :param pattern: + :return: + """ + return pattern.items() <= obj.items() + + def __call__(self, request: Any) -> bool: + """ + + :param request: + :return: True if matches the provided request object, False - otherwise + """ + if isinstance(request, requests.Request): + prepared_request = request.prepare() + elif isinstance(request, requests.PreparedRequest): + prepared_request = request + else: + return False + + if self._method is not None: + if prepared_request.method != self._method: + return False + if self._url is not None and prepared_request.url is not None: + url_without_params = prepared_request.url.split("?")[0] + if url_without_params != self._url: + return False + if self._params is not None: + parsed_url = parse.urlsplit(prepared_request.url) + params = dict(parse.parse_qsl(str(parsed_url.query))) + if not self._match_dict(params, self._params): + return False + if self._headers is not None: + if not self._match_dict(prepared_request.headers, self._headers): + return False + return True + + +class HttpRequestRegexMatcher(RequestMatcher): """ Extended RequestMatcher for HTTP requests that supports matching on: - HTTP method (case-insensitive) diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index 16bce68e3..853e2997e 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -17,6 +17,7 @@ CallRateLimitHit, FixedWindowCallRatePolicy, HttpRequestMatcher, + HttpRequestRegexMatcher, MovingWindowCallRatePolicy, Rate, UnlimitedCallRatePolicy, @@ -357,3 +358,90 @@ def test_with_cache(self, mocker, requests_mock): assert next(records) == {"data": "some_data"} assert MovingWindowCallRatePolicy.try_acquire.call_count == 1 + + +class TestHttpRequestRegexMatcher: + """ + Tests for the new regex-based logic: + - Case-insensitive HTTP method matching + - Optional url_base (scheme://netloc) + - Regex-based path matching + - Query params (must be present) + - Headers (case-insensitive keys) + """ + + def test_case_insensitive_method(self): + matcher = HttpRequestRegexMatcher(method="GET") + + req_ok = Request("get", "https://example.com/test/path") + req_wrong = Request("POST", "https://example.com/test/path") + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_url_base(self): + matcher = HttpRequestRegexMatcher(url_base="https://example.com") + + req_ok = Request("GET", "https://example.com/test/path?foo=bar") + req_wrong = Request("GET", "https://another.com/test/path?foo=bar") + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_url_path_pattern(self): + matcher = HttpRequestRegexMatcher(url_path_pattern=r"/test/") + + req_ok = Request("GET", "https://example.com/test/something") + req_wrong = Request("GET", "https://example.com/other/something") + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_query_params(self): + matcher = HttpRequestRegexMatcher(params={"foo": "bar"}) + + req_ok = Request("GET", "https://example.com/api?foo=bar&extra=123") + req_missing = Request("GET", "https://example.com/api?not_foo=bar") + + assert matcher(req_ok) + assert not matcher(req_missing) + + def test_headers_case_insensitive(self): + matcher = HttpRequestRegexMatcher(headers={"X-Custom-Header": "abc"}) + + req_ok = Request( + "GET", + "https://example.com/api?foo=bar", + headers={"x-custom-header": "abc", "other": "123"}, + ) + req_wrong = Request("GET", "https://example.com/api", headers={"x-custom-header": "wrong"}) + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_combined_criteria(self): + matcher = HttpRequestRegexMatcher( + method="GET", + url_base="https://example.com", + url_path_pattern=r"/test/", + params={"foo": "bar"}, + headers={"X-Test": "123"}, + ) + + req_ok = Request("GET", "https://example.com/test/me?foo=bar", headers={"x-test": "123"}) + req_bad_base = Request( + "GET", "https://other.com/test/me?foo=bar", headers={"x-test": "123"} + ) + req_bad_path = Request("GET", "https://example.com/nope?foo=bar", headers={"x-test": "123"}) + req_bad_param = Request( + "GET", "https://example.com/test/me?extra=xyz", headers={"x-test": "123"} + ) + req_bad_header = Request( + "GET", "https://example.com/test/me?foo=bar", headers={"some-other-header": "xyz"} + ) + + assert matcher(req_ok) + assert not matcher(req_bad_base) + assert not matcher(req_bad_path) + assert not matcher(req_bad_param) + assert not matcher(req_bad_header) From 1285668eecf394e90d373490c561d506f808d73d Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Sun, 9 Feb 2025 22:26:53 +0200 Subject: [PATCH 05/19] Add unit tests --- .../test_model_to_component_factory.py | 80 +++++++++++++++++++ .../requesters/test_http_requester.py | 32 ++++++++ 2 files changed, 112 insertions(+) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 43564a5c8..769bc52a0 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -142,6 +142,7 @@ from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, DayClampingStrategy, @@ -3564,3 +3565,82 @@ def test_create_async_retriever(): assert isinstance(selector, RecordSelector) assert isinstance(extractor, DpathExtractor) assert extractor.field_path == ["data"] + + +def test_api_budget(): + manifest = { + "type": "DeclarativeSource", + "api_budget": { + "type": "HTTPAPIBudget", + "ratelimit_reset_header": "X-RateLimit-Reset", + "ratelimit_remaining_header": "X-RateLimit-Remaining", + "status_codes_for_ratelimit_hit": [429, 503], + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "type": "Rate", + "limit": 3, + "interval": "PT0.1S", # 0.1 seconds + } + ], + "matchers": [ + { + "type": "HttpRequestRegexMatcher", + "method": "GET", + "url_base": "https://api.sendgrid.com", + "url_path_pattern": "/v3/marketing/lists", + } + ], + } + ], + }, + "my_requester": { + "type": "HttpRequester", + "path": "/v3/marketing/lists", + "url_base": "https://api.sendgrid.com", + "http_method": "GET", + "authenticator": { + "type": "BasicHttpAuthenticator", + "username": "admin", + "password": "{{ config['password'] }}", + }, + }, + } + + config = { + "password": "verysecrettoken", + } + + factory = ModelToComponentFactory() + if "api_budget" in manifest: + factory.set_api_budget(manifest["api_budget"], config) + + from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequester as HttpRequesterModel, + ) + + requester_definition = manifest["my_requester"] + assert requester_definition["type"] == "HttpRequester" + + http_requester = factory.create_component( + model_type=HttpRequesterModel, + component_definition=requester_definition, + config=config, + name="lists_stream", + decoder=None, + ) + + assert http_requester.api_budget is not None + assert http_requester.api_budget.ratelimit_reset_header == "X-RateLimit-Reset" + assert http_requester.api_budget.status_codes_for_ratelimit_hit == [429, 503] + assert len(http_requester.api_budget.policies) == 1 + + # The single policy is a MovingWindowCallRatePolicy + policy = http_requester.api_budget.policies[0] + assert isinstance(policy, MovingWindowCallRatePolicy) + assert policy._bucket.rates[0].limit == 3 + # The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally + # but here just check that the limit and interval exist + assert policy._bucket.rates[0].interval == 100 # 100 ms diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index f02ec206b..c5d5c218d 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from datetime import timedelta from typing import Any, Mapping, Optional from unittest import mock from unittest.mock import MagicMock @@ -9,6 +10,7 @@ import pytest as pytest import requests +import requests.sessions from requests import PreparedRequest from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator @@ -27,6 +29,12 @@ InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.call_rate import ( + AbstractAPIBudget, + HttpAPIBudget, + MovingWindowCallRatePolicy, + Rate, +) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.streams.http.exceptions import ( RequestBodyException, @@ -45,6 +53,7 @@ def factory( request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None, authenticator: Optional[DeclarativeAuthenticator] = None, error_handler: Optional[ErrorHandler] = None, + api_budget: Optional[HttpAPIBudget] = None, config: Optional[Config] = None, parameters: Mapping[str, Any] = None, disable_retries: bool = False, @@ -61,6 +70,7 @@ def factory( http_method=http_method, request_options_provider=request_options_provider, error_handler=error_handler, + api_budget=api_budget, disable_retries=disable_retries, message_repository=message_repository or MagicMock(), use_cache=use_cache, @@ -934,3 +944,25 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any http_requester._http_client._request_attempt_count.get(request_mock) == http_requester._http_client._max_retries + 1 ) + + +def test_http_requester_with_mock_apibudget(http_requester_factory, monkeypatch): + mock_budget = MagicMock(spec=HttpAPIBudget) + + requester = http_requester_factory( + url_base="https://example.com", + path="test", + api_budget=mock_budget, + ) + + dummy_response = requests.Response() + dummy_response.status_code = 200 + send_mock = MagicMock(return_value=dummy_response) + monkeypatch.setattr(requests.Session, "send", send_mock) + + response = requester.send_request() + + assert send_mock.call_count == 1 + assert response.status_code == 200 + + assert mock_budget.acquire_call.call_count == 1 From 7be98423518c975e672629abbd4cb063048e55d2 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Sun, 9 Feb 2025 22:38:57 +0200 Subject: [PATCH 06/19] Add FixedWindowCallRatePolicy unit test --- .../parsers/model_to_component_factory.py | 2 +- .../test_model_to_component_factory.py | 79 +++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9bd775a4a..b55d40fcd 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3001,7 +3001,7 @@ def create_fixed_window_call_rate_policy( ] return FixedWindowCallRatePolicy( next_reset_ts=model.next_reset_ts, - period=parse_duration(model.period), + period=model.period, call_limit=model.call_limit, matchers=matchers, ) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 769bc52a0..bc72ea36b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3644,3 +3644,82 @@ def test_api_budget(): # The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally # but here just check that the limit and interval exist assert policy._bucket.rates[0].interval == 100 # 100 ms + + +def test_api_budget_fixed_window_policy(): + manifest = { + "type": "DeclarativeSource", + # Root-level api_budget referencing a FixedWindowCallRatePolicy + "api_budget": { + "type": "APIBudget", + "maximum_attempts_to_acquire": 9999, + "policies": [ + { + "type": "FixedWindowCallRatePolicy", + "next_reset_ts": "2025-01-01T00:00:00Z", + "period": "PT1M", # 1 minute + "call_limit": 10, + "matchers": [ + { + "type": "HttpRequestRegexMatcher", + "method": "GET", + "url_base": "https://example.org", + "url_path_pattern": "/v2/data", + } + ], + } + ], + }, + # We'll define a single HttpRequester that references that base + "my_requester": { + "type": "HttpRequester", + "path": "/v2/data", + "url_base": "https://example.org", + "http_method": "GET", + "authenticator": {"type": "NoAuth"}, + }, + } + + config = {} + + factory = ModelToComponentFactory() + if "api_budget" in manifest: + factory.set_api_budget(manifest["api_budget"], config) + + from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequester as HttpRequesterModel, + ) + + requester_definition = manifest["my_requester"] + assert requester_definition["type"] == "HttpRequester" + http_requester = factory.create_component( + model_type=HttpRequesterModel, + component_definition=requester_definition, + config=config, + name="my_stream", + decoder=None, + ) + + assert http_requester.api_budget is not None + assert http_requester.api_budget.maximum_attempts_to_acquire == 9999 + assert len(http_requester.api_budget.policies) == 1 + + from airbyte_cdk.sources.streams.call_rate import FixedWindowCallRatePolicy + + policy = http_requester.api_budget.policies[0] + assert isinstance(policy, FixedWindowCallRatePolicy) + assert policy._call_limit == 10 + # The period is "PT1M" => 60 seconds + assert policy._offset.total_seconds() == 60 + + expected_reset_dt = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + assert policy._next_reset_ts == expected_reset_dt + + assert len(policy._matchers) == 1 + matcher = policy._matchers[0] + from airbyte_cdk.sources.streams.call_rate import HttpRequestRegexMatcher + + assert isinstance(matcher, HttpRequestRegexMatcher) + assert matcher._method == "GET" + assert matcher._url_base == "https://example.org" + assert matcher._url_path_pattern.pattern == "/v2/data" From 8d3bfce9fef2442d46eab37cbe6e5d5f275c7ec5 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 10 Feb 2025 11:24:38 +0200 Subject: [PATCH 07/19] Change the partitions limit to 1000 --- .../declarative/incremental/concurrent_partition_cursor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index ab667c655..fd803df49 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -58,7 +58,7 @@ class ConcurrentPerPartitionCursor(Cursor): CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. """ - DEFAULT_MAX_PARTITIONS_NUMBER = 10000 + DEFAULT_MAX_PARTITIONS_NUMBER = 1000 _NO_STATE: Mapping[str, Any] = {} _NO_CURSOR_STATE: Mapping[str, Any] = {} _GLOBAL_STATE_KEY = "state" From 509ea05575c146587d2d0c0970e09a886fee3a35 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 10 Feb 2025 17:31:53 +0200 Subject: [PATCH 08/19] Refactored switching logic --- .../incremental/concurrent_partition_cursor.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index fd803df49..f54a0297f 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -58,7 +58,8 @@ class ConcurrentPerPartitionCursor(Cursor): CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. """ - DEFAULT_MAX_PARTITIONS_NUMBER = 1000 + DEFAULT_MAX_PARTITIONS_NUMBER = 10_000 + SWITCH_TO_GLOBAL_LIMIT = 1000 _NO_STATE: Mapping[str, Any] = {} _NO_CURSOR_STATE: Mapping[str, Any] = {} _GLOBAL_STATE_KEY = "state" @@ -99,7 +100,7 @@ def __init__( self._new_global_cursor: Optional[StreamState] = None self._lookback_window: int = 0 self._parent_state: Optional[StreamState] = None - self._over_limit: int = 0 + self._number_of_partitions: int = 0 self._use_global_cursor: bool = False self._partition_serializer = PerPartitionKeySerializer() @@ -233,8 +234,8 @@ def _ensure_partition_limit(self) -> None: or removed due to being the oldest. """ with self._lock: + self._number_of_partitions += 1 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: - self._over_limit += 1 # Try removing finished partitions first for partition_key in list(self._cursor_per_partition.keys()): if ( @@ -245,7 +246,7 @@ def _ensure_partition_limit(self) -> None: partition_key ) # Remove the oldest partition logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}." + f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}." ) break else: @@ -254,7 +255,7 @@ def _ensure_partition_limit(self) -> None: 1 ] # Remove the oldest partition logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions}." ) def _set_initial_state(self, stream_state: StreamState) -> None: @@ -355,6 +356,10 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: def observe(self, record: Record) -> None: if not self._use_global_cursor and self.limit_reached(): + logger.info( + f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " + f"Switching to global cursor for {self._stream_name}." + ) self._use_global_cursor = True if not record.associated_slice: @@ -397,4 +402,4 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor: return cursor def limit_reached(self) -> bool: - return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER + return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT From 8d44150ce61cb38aaf4e9ce30183ef43f3a7a0fd Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 10 Feb 2025 20:53:06 +0200 Subject: [PATCH 09/19] Increase the limit for number of partitions in memory --- .../declarative/incremental/concurrent_partition_cursor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index f54a0297f..d69b61bfd 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -58,7 +58,7 @@ class ConcurrentPerPartitionCursor(Cursor): CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. """ - DEFAULT_MAX_PARTITIONS_NUMBER = 10_000 + DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 SWITCH_TO_GLOBAL_LIMIT = 1000 _NO_STATE: Mapping[str, Any] = {} _NO_CURSOR_STATE: Mapping[str, Any] = {} From 342375c5fc1017a8738fbc0a7166695f24388801 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 12 Feb 2025 15:42:21 +0200 Subject: [PATCH 10/19] Refactor ConcurrentPerPartitionCursor to not use ConcurrentCursor with `_use_global_cursor` --- .../declarative_component_schema.yaml | 207 ------------------ .../concurrent_partition_cursor.py | 29 ++- .../manifest_declarative_source.py | 4 - .../models/declarative_component_schema.py | 165 -------------- .../parsers/model_to_component_factory.py | 141 ------------ .../declarative/requesters/http_requester.py | 3 - airbyte_cdk/sources/streams/call_rate.py | 156 ++++--------- 7 files changed, 59 insertions(+), 646 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 25c9492fb..b0242c94f 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -40,12 +40,6 @@ properties: "$ref": "#/definitions/Spec" concurrency_level: "$ref": "#/definitions/ConcurrencyLevel" - api_budget: - title: API Budget - description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams. - anyOf: - - "$ref": "#/definitions/APIBudget" - - "$ref": "#/definitions/HTTPAPIBudget" metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. @@ -1371,207 +1365,6 @@ definitions: $parameters: type: object additional_properties: true - APIBudget: - title: API Budget - description: > - A generic API budget configuration that defines the policies (rate limiting rules) - and the maximum number of attempts to acquire a call credit. This budget does not automatically - update itself based on HTTP response headers. - type: object - required: - - type - - policies - properties: - type: - type: string - enum: [APIBudget] - policies: - title: Policies - description: List of call rate policies that define how many calls are allowed. - type: array - items: - anyOf: - - "$ref": "#/definitions/FixedWindowCallRatePolicy" - - "$ref": "#/definitions/MovingWindowCallRatePolicy" - - "$ref": "#/definitions/UnlimitedCallRatePolicy" - maximum_attempts_to_acquire: - title: Maximum Attempts to Acquire - description: The maximum number of attempts to acquire a call before giving up. - type: integer - default: 100000 - additionalProperties: true - HTTPAPIBudget: - title: HTTP API Budget - description: > - An HTTP-specific API budget that extends APIBudget by updating rate limiting information based - on HTTP response headers. It extracts available calls and the next reset timestamp from the HTTP responses. - type: object - required: - - type - - policies - properties: - type: - type: string - enum: [HTTPAPIBudget] - policies: - title: Policies - description: List of call rate policies that define how many calls are allowed. - type: array - items: - anyOf: - - "$ref": "#/definitions/FixedWindowCallRatePolicy" - - "$ref": "#/definitions/MovingWindowCallRatePolicy" - - "$ref": "#/definitions/UnlimitedCallRatePolicy" - ratelimit_reset_header: - title: Rate Limit Reset Header - description: The HTTP response header name that indicates when the rate limit resets. - type: string - default: "ratelimit-reset" - ratelimit_remaining_header: - title: Rate Limit Remaining Header - description: The HTTP response header name that indicates the number of remaining allowed calls. - type: string - default: "ratelimit-remaining" - status_codes_for_ratelimit_hit: - title: Status Codes for Rate Limit Hit - description: List of HTTP status codes that indicate a rate limit has been hit. - type: array - items: - type: integer - default: [429] - maximum_attempts_to_acquire: - title: Maximum Attempts to Acquire - description: The maximum number of attempts to acquire a call before giving up. - type: integer - default: 100000 - additionalProperties: true - FixedWindowCallRatePolicy: - title: Fixed Window Call Rate Policy - description: A policy that allows a fixed number of calls within a specific time window. - type: object - required: - - type - - next_reset_ts - - period - - call_limit - - matchers - properties: - type: - type: string - enum: [FixedWindowCallRatePolicy] - next_reset_ts: - title: Next Reset Timestamp - description: The timestamp when the rate limit will reset. - type: string - format: date-time - period: - title: Period - description: The time interval for the rate limit window. - type: string - format: duration - call_limit: - title: Call Limit - description: The maximum number of calls allowed within the period. - type: integer - matchers: - title: Matchers - description: List of matchers that define which requests this policy applies to. - type: array - items: - "$ref": "#/definitions/HttpRequestRegexMatcher" - additionalProperties: true - MovingWindowCallRatePolicy: - title: Moving Window Call Rate Policy - description: A policy that allows a fixed number of calls within a moving time window. - type: object - required: - - type - - rates - - matchers - properties: - type: - type: string - enum: [MovingWindowCallRatePolicy] - rates: - title: Rates - description: List of rates that define the call limits for different time intervals. - type: array - items: - "$ref": "#/definitions/Rate" - matchers: - title: Matchers - description: List of matchers that define which requests this policy applies to. - type: array - items: - "$ref": "#/definitions/HttpRequestRegexMatcher" - additionalProperties: true - UnlimitedCallRatePolicy: - title: Unlimited Call Rate Policy - description: A policy that allows unlimited calls for specific requests. - type: object - required: - - type - - matchers - properties: - type: - type: string - enum: [UnlimitedCallRatePolicy] - matchers: - title: Matchers - description: List of matchers that define which requests this policy applies to. - type: array - items: - "$ref": "#/definitions/HttpRequestRegexMatcher" - additionalProperties: true - Rate: - title: Rate - description: Defines a rate limit with a specific number of calls allowed within a time interval. - type: object - required: - - limit - - interval - properties: - limit: - title: Limit - description: The maximum number of calls allowed within the interval. - type: integer - interval: - title: Interval - description: The time interval for the rate limit. - type: string - format: duration - additionalProperties: true - HttpRequestRegexMatcher: - title: HTTP Request Matcher - description: > - Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers. - Use `url_base` to specify the scheme and host (without trailing slash) and - `url_path_pattern` to apply a regex to the request path. - type: object - properties: - method: - title: Method - description: The HTTP method to match (e.g., GET, POST). - type: string - url_base: - title: URL Base - description: The base URL (scheme and host, e.g. "https://api.example.com") to match. - type: string - url_path_pattern: - title: URL Path Pattern - description: A regular expression pattern to match the URL path. - type: string - params: - title: Parameters - description: The query parameters to match. - type: object - additionalProperties: true - headers: - title: Headers - description: The headers to match. - type: object - additionalProperties: true - additionalProperties: true DefaultErrorHandler: title: Default Error Handler description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff. diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index d69b61bfd..fc75ecd90 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -142,7 +142,8 @@ def close_partition(self, partition: Partition) -> None: raise ValueError("stream_slice cannot be None") partition_key = self._to_partition_key(stream_slice.partition) - self._cursor_per_partition[partition_key].close_partition(partition=partition) + if not self._use_global_cursor: + self._cursor_per_partition[partition_key].close_partition(partition=partition) with self._lock: self._semaphore_per_partition[partition_key].acquire() cursor = self._cursor_per_partition[partition_key] @@ -150,12 +151,7 @@ def close_partition(self, partition: Partition) -> None: partition_key in self._finished_partitions and self._semaphore_per_partition[partition_key]._value == 0 ): - if ( - self._new_global_cursor is None - or self._new_global_cursor[self.cursor_field.cursor_field_key] - < cursor.state[self.cursor_field.cursor_field_key] - ): - self._new_global_cursor = copy.deepcopy(cursor.state) + self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) if not self._use_global_cursor: self._emit_state_message() @@ -366,9 +362,22 @@ def observe(self, record: Record) -> None: raise ValueError( "Invalid state as stream slices that are emitted should refer to an existing cursor" ) - self._cursor_per_partition[ - self._to_partition_key(record.associated_slice.partition) - ].observe(record) + + record_cursor = self._connector_state_converter.parse_value( + self._cursor_field.extract_value(record) + ) + self._update_global_cursor(record_cursor) + if not self._use_global_cursor: + self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ].observe(record) + + def _update_global_cursor(self, value: Mapping[str, Any]) -> None: + if ( + self._new_global_cursor is None + or self._new_global_cursor[self.cursor_field.cursor_field_key] < value + ): + self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)} def _to_partition_key(self, partition: Mapping[str, Any]) -> str: return self._partition_serializer.to_partition_key(partition) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index d3afb1396..efc779464 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -137,10 +137,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._source_config, config ) - api_budget_model = self._source_config.get("api_budget") - if api_budget_model: - self._constructor.set_api_budget(api_budget_model, config) - source_streams = [ self._constructor.create_component( DeclarativeStreamModel, diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index aaff67548..fe29cee2c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -3,7 +3,6 @@ from __future__ import annotations -from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union @@ -643,45 +642,6 @@ class OAuthAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class Rate(BaseModel): - class Config: - extra = Extra.allow - - limit: int = Field( - ..., - description="The maximum number of calls allowed within the interval.", - title="Limit", - ) - interval: timedelta = Field( - ..., description="The time interval for the rate limit.", title="Interval" - ) - - -class HttpRequestRegexMatcher(BaseModel): - class Config: - extra = Extra.allow - - method: Optional[str] = Field( - None, description="The HTTP method to match (e.g., GET, POST).", title="Method" - ) - url_base: Optional[str] = Field( - None, - description='The base URL (scheme and host, e.g. "https://api.example.com") to match.', - title="URL Base", - ) - url_path_pattern: Optional[str] = Field( - None, - description="A regular expression pattern to match the URL path.", - title="URL Path Pattern", - ) - params: Optional[Dict[str, Any]] = Field( - None, description="The query parameters to match.", title="Parameters" - ) - headers: Optional[Dict[str, Any]] = Field( - None, description="The headers to match.", title="Headers" - ) - - class DpathExtractor(BaseModel): type: Literal["DpathExtractor"] field_path: List[str] = Field( @@ -1624,60 +1584,6 @@ class DatetimeBasedCursor(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class FixedWindowCallRatePolicy(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["FixedWindowCallRatePolicy"] - next_reset_ts: datetime = Field( - ..., - description="The timestamp when the rate limit will reset.", - title="Next Reset Timestamp", - ) - period: timedelta = Field( - ..., description="The time interval for the rate limit window.", title="Period" - ) - call_limit: int = Field( - ..., - description="The maximum number of calls allowed within the period.", - title="Call Limit", - ) - matchers: List[HttpRequestRegexMatcher] = Field( - ..., - description="List of matchers that define which requests this policy applies to.", - title="Matchers", - ) - - -class MovingWindowCallRatePolicy(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["MovingWindowCallRatePolicy"] - rates: List[Rate] = Field( - ..., - description="List of rates that define the call limits for different time intervals.", - title="Rates", - ) - matchers: List[HttpRequestRegexMatcher] = Field( - ..., - description="List of matchers that define which requests this policy applies to.", - title="Matchers", - ) - - -class UnlimitedCallRatePolicy(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["UnlimitedCallRatePolicy"] - matchers: List[HttpRequestRegexMatcher] = Field( - ..., - description="List of matchers that define which requests this policy applies to.", - title="Matchers", - ) - - class DefaultErrorHandler(BaseModel): type: Literal["DefaultErrorHandler"] backoff_strategies: Optional[ @@ -1809,67 +1715,6 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class APIBudget(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["APIBudget"] - policies: List[ - Union[ - FixedWindowCallRatePolicy, - MovingWindowCallRatePolicy, - UnlimitedCallRatePolicy, - ] - ] = Field( - ..., - description="List of call rate policies that define how many calls are allowed.", - title="Policies", - ) - maximum_attempts_to_acquire: Optional[int] = Field( - 100000, - description="The maximum number of attempts to acquire a call before giving up.", - title="Maximum Attempts to Acquire", - ) - - -class HTTPAPIBudget(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["HTTPAPIBudget"] - policies: List[ - Union[ - FixedWindowCallRatePolicy, - MovingWindowCallRatePolicy, - UnlimitedCallRatePolicy, - ] - ] = Field( - ..., - description="List of call rate policies that define how many calls are allowed.", - title="Policies", - ) - ratelimit_reset_header: Optional[str] = Field( - "ratelimit-reset", - description="The HTTP response header name that indicates when the rate limit resets.", - title="Rate Limit Reset Header", - ) - ratelimit_remaining_header: Optional[str] = Field( - "ratelimit-remaining", - description="The HTTP response header name that indicates the number of remaining allowed calls.", - title="Rate Limit Remaining Header", - ) - status_codes_for_ratelimit_hit: Optional[List[int]] = Field( - [429], - description="List of HTTP status codes that indicate a rate limit has been hit.", - title="Status Codes for Rate Limit Hit", - ) - maximum_attempts_to_acquire: Optional[int] = Field( - 100000, - description="The maximum number of attempts to acquire a call before giving up.", - title="Maximum Attempts to Acquire", - ) - - class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -1903,11 +1748,6 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None - api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( - None, - description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", - title="API Budget", - ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -1934,11 +1774,6 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None - api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( - None, - description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", - title="API Budget", - ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4b80e851b..c6d69623d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -112,9 +112,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddFields as AddFieldsModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - APIBudget as APIBudgetModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ApiKeyAuthenticator as ApiKeyAuthenticatorModel, ) @@ -229,9 +226,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - FixedWindowCallRatePolicy as FixedWindowCallRatePolicyModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) @@ -241,18 +235,12 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipParser as GzipParserModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HTTPAPIBudget as HTTPAPIBudgetModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HttpRequestRegexMatcher as HttpRequestRegexMatcherModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, ) @@ -307,9 +295,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( MinMaxDatetime as MinMaxDatetimeModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - MovingWindowCallRatePolicy as MovingWindowCallRatePolicyModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( NoAuth as NoAuthModel, ) @@ -328,9 +313,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - Rate as RateModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -374,9 +356,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( TypesMap as TypesMapModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, @@ -490,15 +469,6 @@ MessageRepository, NoopMessageRepository, ) -from airbyte_cdk.sources.streams.call_rate import ( - APIBudget, - FixedWindowCallRatePolicy, - HttpAPIBudget, - HttpRequestRegexMatcher, - MovingWindowCallRatePolicy, - Rate, - UnlimitedCallRatePolicy, -) from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, ClampingStrategy, @@ -550,7 +520,6 @@ def __init__( self._evaluate_log_level(emit_connector_builder_messages) ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() - self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -638,13 +607,6 @@ def _init_mappings(self) -> None: StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, ZipfileDecoderModel: self.create_zipfile_decoder, - APIBudgetModel: self.create_api_budget, - HTTPAPIBudgetModel: self.create_http_api_budget, - FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy, - MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, - UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, - RateModel: self.create_rate, - HttpRequestRegexMatcherModel: self.create_http_request_matcher, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -1957,8 +1919,6 @@ def create_http_requester( ) ) - api_budget = self._api_budget - request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, request_body_json=model.request_body_json, @@ -1979,7 +1939,6 @@ def create_http_requester( path=model.path, authenticator=authenticator, error_handler=error_handler, - api_budget=api_budget, http_method=HttpMethod[model.http_method.value], request_options_provider=request_options_provider, config=config, @@ -2981,103 +2940,3 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: return isinstance(parser.inner_parser, JsonParser) else: return False - - def create_api_budget(self, model: APIBudgetModel, config: Config, **kwargs: Any) -> APIBudget: - policies = [ - self._create_component_from_model(model=policy, config=config) - for policy in model.policies - ] - - return APIBudget( - policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, - ) - - def create_http_api_budget( - self, model: HTTPAPIBudgetModel, config: Config, **kwargs: Any - ) -> HttpAPIBudget: - policies = [ - self._create_component_from_model(model=policy, config=config) - for policy in model.policies - ] - - return HttpAPIBudget( - policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, - ratelimit_reset_header=model.ratelimit_reset_header or "ratelimit-reset", - ratelimit_remaining_header=model.ratelimit_remaining_header or "ratelimit-remaining", - status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or (429,), - ) - - def create_fixed_window_call_rate_policy( - self, model: FixedWindowCallRatePolicyModel, config: Config, **kwargs: Any - ) -> FixedWindowCallRatePolicy: - matchers = [ - self._create_component_from_model(model=matcher, config=config) - for matcher in model.matchers - ] - return FixedWindowCallRatePolicy( - next_reset_ts=model.next_reset_ts, - period=model.period, - call_limit=model.call_limit, - matchers=matchers, - ) - - def create_moving_window_call_rate_policy( - self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any - ) -> MovingWindowCallRatePolicy: - rates = [ - self._create_component_from_model(model=rate, config=config) for rate in model.rates - ] - matchers = [ - self._create_component_from_model(model=matcher, config=config) - for matcher in model.matchers - ] - return MovingWindowCallRatePolicy( - rates=rates, - matchers=matchers, - ) - - def create_unlimited_call_rate_policy( - self, model: UnlimitedCallRatePolicyModel, config: Config, **kwargs: Any - ) -> UnlimitedCallRatePolicy: - matchers = [ - self._create_component_from_model(model=matcher, config=config) - for matcher in model.matchers - ] - - return UnlimitedCallRatePolicy( - matchers=matchers, - ) - - def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: - return Rate( - limit=model.limit, - interval=model.interval, - ) - - def create_http_request_matcher( - self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any - ) -> HttpRequestRegexMatcher: - return HttpRequestRegexMatcher( - method=model.method, - url_base=model.url_base, - url_path_pattern=model.url_path_pattern, - params=model.params, - headers=model.headers, - ) - - def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None: - model_str = component_definition.get("type") - if model_str == "APIBudget": - # Annotate model_type as a type that is a subclass of BaseModel - model_type: Union[Type[APIBudgetModel], Type[HTTPAPIBudgetModel]] = APIBudgetModel - elif model_str == "HTTPAPIBudget": - model_type = HTTPAPIBudgetModel - else: - raise ValueError(f"Unknown API Budget type: {model_str}") - - # create_component expects a type[BaseModel] and returns an instance of that model. - self._api_budget = self.create_component( - model_type=model_type, component_definition=component_definition, config=config - ) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index b206bd688..ad23f4d06 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -22,7 +22,6 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository -from airbyte_cdk.sources.streams.call_rate import APIBudget from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @@ -56,7 +55,6 @@ class HttpRequester(Requester): http_method: Union[str, HttpMethod] = HttpMethod.GET request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None error_handler: Optional[ErrorHandler] = None - api_budget: Optional[APIBudget] = None disable_retries: bool = False message_repository: MessageRepository = NoopMessageRepository() use_cache: bool = False @@ -93,7 +91,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: name=self.name, logger=self.logger, error_handler=self.error_handler, - api_budget=self.api_budget, authenticator=self._authenticator, use_cache=self.use_cache, backoff_strategy=backoff_strategies, diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 21fec881f..81ebac78e 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -6,12 +6,10 @@ import dataclasses import datetime import logging -import re import time -from dataclasses import InitVar, dataclass, field from datetime import timedelta from threading import RLock -from typing import TYPE_CHECKING, Any, Mapping, Optional, Union +from typing import TYPE_CHECKING, Any, Mapping, Optional from urllib import parse import requests @@ -162,100 +160,6 @@ def __call__(self, request: Any) -> bool: return True -class HttpRequestRegexMatcher(RequestMatcher): - """ - Extended RequestMatcher for HTTP requests that supports matching on: - - HTTP method (case-insensitive) - - URL base (scheme + netloc) optionally - - URL path pattern (a regex applied to the path portion of the URL) - - Query parameters (must be present) - - Headers (header names compared case-insensitively) - """ - - def __init__( - self, - method: Optional[str] = None, - url_base: Optional[str] = None, - url_path_pattern: Optional[str] = None, - params: Optional[Mapping[str, Any]] = None, - headers: Optional[Mapping[str, Any]] = None, - ): - """ - :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. - :param url_base: Base URL (scheme://host) that must match. - :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. - :param params: Dictionary of query parameters that must be present in the request. - :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). - """ - self._method = method.upper() if method else None - - # Normalize the url_base if provided: remove trailing slash. - self._url_base = url_base.rstrip("/") if url_base else None - - # Compile the URL path pattern if provided. - self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None - - # Normalize query parameters to strings. - self._params = {str(k): str(v) for k, v in (params or {}).items()} - - # Normalize header keys to lowercase. - self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} - - @staticmethod - def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: - """Check that every key/value in the pattern exists in the object.""" - return pattern.items() <= obj.items() - - def __call__(self, request: Any) -> bool: - """ - :param request: A requests.Request or requests.PreparedRequest instance. - :return: True if the request matches all provided criteria; False otherwise. - """ - # Prepare the request (if needed) and extract the URL details. - if isinstance(request, requests.Request): - prepared_request = request.prepare() - elif isinstance(request, requests.PreparedRequest): - prepared_request = request - else: - return False - - # Check HTTP method. - if self._method is not None and prepared_request.method is not None: - if prepared_request.method.upper() != self._method: - return False - - # Parse the URL. - parsed_url = parse.urlsplit(prepared_request.url) - # Reconstruct the base: scheme://netloc - request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" - # The path (without query parameters) - request_path = str(parsed_url.path).rstrip("/") - - # If a base URL is provided, check that it matches. - if self._url_base is not None: - if request_url_base != self._url_base: - return False - - # If a URL path pattern is provided, ensure the path matches the regex. - if self._url_path_pattern is not None: - if not self._url_path_pattern.search(request_path): - return False - - # Check query parameters. - if self._params: - query_params = dict(parse.parse_qsl(str(parsed_url.query))) - if not self._match_dict(query_params, self._params): - return False - - # Check headers (normalize keys to lower-case). - if self._headers: - req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} - if not self._match_dict(req_headers, self._headers): - return False - - return True - - class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC): def __init__(self, matchers: list[RequestMatcher]): self._matchers = matchers @@ -495,17 +399,24 @@ def update_from_response(self, request: Any, response: Any) -> None: """ -@dataclass class APIBudget(AbstractAPIBudget): - """ - Default APIBudget implementation. - """ + """Default APIBudget implementation""" + + def __init__( + self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 + ) -> None: + """Constructor + + :param policies: list of policies in this budget + :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here + to avoid situations when many threads compete with each other for a few lots over a significant amount of time + """ - policies: list[AbstractCallRatePolicy] - maximum_attempts_to_acquire: int = 100000 + self._policies = policies + self._maximum_attempts_to_acquire = maximum_attempts_to_acquire def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: - for policy in self.policies: + for policy in self._policies: if policy.matches(request): return policy return None @@ -526,7 +437,7 @@ def acquire_call( policy = self.get_matching_policy(request) if policy: self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) - elif self.policies: + elif self._policies: logger.info("no policies matched with requests, allow call by default") def update_from_response(self, request: Any, response: Any) -> None: @@ -549,7 +460,7 @@ def _do_acquire( """ last_exception = None # sometimes we spend all budget before a second attempt, so we have few more here - for attempt in range(1, self.maximum_attempts_to_acquire): + for attempt in range(1, self._maximum_attempts_to_acquire): try: policy.try_acquire(request, weight=1) return @@ -573,18 +484,31 @@ def _do_acquire( if last_exception: logger.info( - "we used all %s attempts to acquire and failed", self.maximum_attempts_to_acquire + "we used all %s attempts to acquire and failed", self._maximum_attempts_to_acquire ) raise last_exception -@dataclass class HttpAPIBudget(APIBudget): """Implementation of AbstractAPIBudget for HTTP""" - ratelimit_reset_header: str = "ratelimit-reset" - ratelimit_remaining_header: str = "ratelimit-remaining" - status_codes_for_ratelimit_hit: Union[tuple[int], list[int]] = (429,) + def __init__( + self, + ratelimit_reset_header: str = "ratelimit-reset", + ratelimit_remaining_header: str = "ratelimit-remaining", + status_codes_for_ratelimit_hit: tuple[int] = (429,), + **kwargs: Any, + ): + """Constructor + + :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget + :param ratelimit_remaining_header: name of the header that has the number of calls left + :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit + """ + self._ratelimit_reset_header = ratelimit_reset_header + self._ratelimit_remaining_header = ratelimit_remaining_header + self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit + super().__init__(**kwargs) def update_from_response(self, request: Any, response: Any) -> None: policy = self.get_matching_policy(request) @@ -599,17 +523,17 @@ def update_from_response(self, request: Any, response: Any) -> None: def get_reset_ts_from_response( self, response: requests.Response ) -> Optional[datetime.datetime]: - if response.headers.get(self.ratelimit_reset_header): + if response.headers.get(self._ratelimit_reset_header): return datetime.datetime.fromtimestamp( - int(response.headers[self.ratelimit_reset_header]) + int(response.headers[self._ratelimit_reset_header]) ) return None def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: - if response.headers.get(self.ratelimit_remaining_header): - return int(response.headers[self.ratelimit_remaining_header]) + if response.headers.get(self._ratelimit_remaining_header): + return int(response.headers[self._ratelimit_remaining_header]) - if response.status_code in self.status_codes_for_ratelimit_hit: + if response.status_code in self._status_codes_for_ratelimit_hit: return 0 return None From 05f4db7b6a3222af20d624439882a080c3014642 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 12 Feb 2025 15:44:45 +0200 Subject: [PATCH 11/19] Delete code from another branch --- .../test_model_to_component_factory.py | 159 ------------------ .../requesters/test_http_requester.py | 32 ---- unit_tests/sources/streams/test_call_rate.py | 88 ---------- 3 files changed, 279 deletions(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 14e3460e0..32a73f364 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -142,7 +142,6 @@ from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, DayClampingStrategy, @@ -3685,161 +3684,3 @@ def test_create_async_retriever(): assert isinstance(selector, RecordSelector) assert isinstance(extractor, DpathExtractor) assert extractor.field_path == ["data"] - - -def test_api_budget(): - manifest = { - "type": "DeclarativeSource", - "api_budget": { - "type": "HTTPAPIBudget", - "ratelimit_reset_header": "X-RateLimit-Reset", - "ratelimit_remaining_header": "X-RateLimit-Remaining", - "status_codes_for_ratelimit_hit": [429, 503], - "policies": [ - { - "type": "MovingWindowCallRatePolicy", - "rates": [ - { - "type": "Rate", - "limit": 3, - "interval": "PT0.1S", # 0.1 seconds - } - ], - "matchers": [ - { - "type": "HttpRequestRegexMatcher", - "method": "GET", - "url_base": "https://api.sendgrid.com", - "url_path_pattern": "/v3/marketing/lists", - } - ], - } - ], - }, - "my_requester": { - "type": "HttpRequester", - "path": "/v3/marketing/lists", - "url_base": "https://api.sendgrid.com", - "http_method": "GET", - "authenticator": { - "type": "BasicHttpAuthenticator", - "username": "admin", - "password": "{{ config['password'] }}", - }, - }, - } - - config = { - "password": "verysecrettoken", - } - - factory = ModelToComponentFactory() - if "api_budget" in manifest: - factory.set_api_budget(manifest["api_budget"], config) - - from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HttpRequester as HttpRequesterModel, - ) - - requester_definition = manifest["my_requester"] - assert requester_definition["type"] == "HttpRequester" - - http_requester = factory.create_component( - model_type=HttpRequesterModel, - component_definition=requester_definition, - config=config, - name="lists_stream", - decoder=None, - ) - - assert http_requester.api_budget is not None - assert http_requester.api_budget.ratelimit_reset_header == "X-RateLimit-Reset" - assert http_requester.api_budget.status_codes_for_ratelimit_hit == [429, 503] - assert len(http_requester.api_budget.policies) == 1 - - # The single policy is a MovingWindowCallRatePolicy - policy = http_requester.api_budget.policies[0] - assert isinstance(policy, MovingWindowCallRatePolicy) - assert policy._bucket.rates[0].limit == 3 - # The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally - # but here just check that the limit and interval exist - assert policy._bucket.rates[0].interval == 100 # 100 ms - - -def test_api_budget_fixed_window_policy(): - manifest = { - "type": "DeclarativeSource", - # Root-level api_budget referencing a FixedWindowCallRatePolicy - "api_budget": { - "type": "APIBudget", - "maximum_attempts_to_acquire": 9999, - "policies": [ - { - "type": "FixedWindowCallRatePolicy", - "next_reset_ts": "2025-01-01T00:00:00Z", - "period": "PT1M", # 1 minute - "call_limit": 10, - "matchers": [ - { - "type": "HttpRequestRegexMatcher", - "method": "GET", - "url_base": "https://example.org", - "url_path_pattern": "/v2/data", - } - ], - } - ], - }, - # We'll define a single HttpRequester that references that base - "my_requester": { - "type": "HttpRequester", - "path": "/v2/data", - "url_base": "https://example.org", - "http_method": "GET", - "authenticator": {"type": "NoAuth"}, - }, - } - - config = {} - - factory = ModelToComponentFactory() - if "api_budget" in manifest: - factory.set_api_budget(manifest["api_budget"], config) - - from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HttpRequester as HttpRequesterModel, - ) - - requester_definition = manifest["my_requester"] - assert requester_definition["type"] == "HttpRequester" - http_requester = factory.create_component( - model_type=HttpRequesterModel, - component_definition=requester_definition, - config=config, - name="my_stream", - decoder=None, - ) - - assert http_requester.api_budget is not None - assert http_requester.api_budget.maximum_attempts_to_acquire == 9999 - assert len(http_requester.api_budget.policies) == 1 - - from airbyte_cdk.sources.streams.call_rate import FixedWindowCallRatePolicy - - policy = http_requester.api_budget.policies[0] - assert isinstance(policy, FixedWindowCallRatePolicy) - assert policy._call_limit == 10 - # The period is "PT1M" => 60 seconds - assert policy._offset.total_seconds() == 60 - - expected_reset_dt = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc) - assert policy._next_reset_ts == expected_reset_dt - - assert len(policy._matchers) == 1 - matcher = policy._matchers[0] - from airbyte_cdk.sources.streams.call_rate import HttpRequestRegexMatcher - - assert isinstance(matcher, HttpRequestRegexMatcher) - assert matcher._method == "GET" - assert matcher._url_base == "https://example.org" - assert matcher._url_path_pattern.pattern == "/v2/data" diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index c5d5c218d..f02ec206b 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -2,7 +2,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from datetime import timedelta from typing import Any, Mapping, Optional from unittest import mock from unittest.mock import MagicMock @@ -10,7 +9,6 @@ import pytest as pytest import requests -import requests.sessions from requests import PreparedRequest from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator @@ -29,12 +27,6 @@ InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.message import MessageRepository -from airbyte_cdk.sources.streams.call_rate import ( - AbstractAPIBudget, - HttpAPIBudget, - MovingWindowCallRatePolicy, - Rate, -) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.streams.http.exceptions import ( RequestBodyException, @@ -53,7 +45,6 @@ def factory( request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None, authenticator: Optional[DeclarativeAuthenticator] = None, error_handler: Optional[ErrorHandler] = None, - api_budget: Optional[HttpAPIBudget] = None, config: Optional[Config] = None, parameters: Mapping[str, Any] = None, disable_retries: bool = False, @@ -70,7 +61,6 @@ def factory( http_method=http_method, request_options_provider=request_options_provider, error_handler=error_handler, - api_budget=api_budget, disable_retries=disable_retries, message_repository=message_repository or MagicMock(), use_cache=use_cache, @@ -944,25 +934,3 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any http_requester._http_client._request_attempt_count.get(request_mock) == http_requester._http_client._max_retries + 1 ) - - -def test_http_requester_with_mock_apibudget(http_requester_factory, monkeypatch): - mock_budget = MagicMock(spec=HttpAPIBudget) - - requester = http_requester_factory( - url_base="https://example.com", - path="test", - api_budget=mock_budget, - ) - - dummy_response = requests.Response() - dummy_response.status_code = 200 - send_mock = MagicMock(return_value=dummy_response) - monkeypatch.setattr(requests.Session, "send", send_mock) - - response = requester.send_request() - - assert send_mock.call_count == 1 - assert response.status_code == 200 - - assert mock_budget.acquire_call.call_count == 1 diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index 853e2997e..16bce68e3 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -17,7 +17,6 @@ CallRateLimitHit, FixedWindowCallRatePolicy, HttpRequestMatcher, - HttpRequestRegexMatcher, MovingWindowCallRatePolicy, Rate, UnlimitedCallRatePolicy, @@ -358,90 +357,3 @@ def test_with_cache(self, mocker, requests_mock): assert next(records) == {"data": "some_data"} assert MovingWindowCallRatePolicy.try_acquire.call_count == 1 - - -class TestHttpRequestRegexMatcher: - """ - Tests for the new regex-based logic: - - Case-insensitive HTTP method matching - - Optional url_base (scheme://netloc) - - Regex-based path matching - - Query params (must be present) - - Headers (case-insensitive keys) - """ - - def test_case_insensitive_method(self): - matcher = HttpRequestRegexMatcher(method="GET") - - req_ok = Request("get", "https://example.com/test/path") - req_wrong = Request("POST", "https://example.com/test/path") - - assert matcher(req_ok) - assert not matcher(req_wrong) - - def test_url_base(self): - matcher = HttpRequestRegexMatcher(url_base="https://example.com") - - req_ok = Request("GET", "https://example.com/test/path?foo=bar") - req_wrong = Request("GET", "https://another.com/test/path?foo=bar") - - assert matcher(req_ok) - assert not matcher(req_wrong) - - def test_url_path_pattern(self): - matcher = HttpRequestRegexMatcher(url_path_pattern=r"/test/") - - req_ok = Request("GET", "https://example.com/test/something") - req_wrong = Request("GET", "https://example.com/other/something") - - assert matcher(req_ok) - assert not matcher(req_wrong) - - def test_query_params(self): - matcher = HttpRequestRegexMatcher(params={"foo": "bar"}) - - req_ok = Request("GET", "https://example.com/api?foo=bar&extra=123") - req_missing = Request("GET", "https://example.com/api?not_foo=bar") - - assert matcher(req_ok) - assert not matcher(req_missing) - - def test_headers_case_insensitive(self): - matcher = HttpRequestRegexMatcher(headers={"X-Custom-Header": "abc"}) - - req_ok = Request( - "GET", - "https://example.com/api?foo=bar", - headers={"x-custom-header": "abc", "other": "123"}, - ) - req_wrong = Request("GET", "https://example.com/api", headers={"x-custom-header": "wrong"}) - - assert matcher(req_ok) - assert not matcher(req_wrong) - - def test_combined_criteria(self): - matcher = HttpRequestRegexMatcher( - method="GET", - url_base="https://example.com", - url_path_pattern=r"/test/", - params={"foo": "bar"}, - headers={"X-Test": "123"}, - ) - - req_ok = Request("GET", "https://example.com/test/me?foo=bar", headers={"x-test": "123"}) - req_bad_base = Request( - "GET", "https://other.com/test/me?foo=bar", headers={"x-test": "123"} - ) - req_bad_path = Request("GET", "https://example.com/nope?foo=bar", headers={"x-test": "123"}) - req_bad_param = Request( - "GET", "https://example.com/test/me?extra=xyz", headers={"x-test": "123"} - ) - req_bad_header = Request( - "GET", "https://example.com/test/me?foo=bar", headers={"some-other-header": "xyz"} - ) - - assert matcher(req_ok) - assert not matcher(req_bad_base) - assert not matcher(req_bad_path) - assert not matcher(req_bad_param) - assert not matcher(req_bad_header) From c0bc64538acfbbcf06f7df8908a3ff248f061089 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 12 Feb 2025 17:44:45 +0200 Subject: [PATCH 12/19] Fix cursor value from record --- .../declarative/incremental/concurrent_partition_cursor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index fc75ecd90..4dc3a6341 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -363,8 +363,8 @@ def observe(self, record: Record) -> None: "Invalid state as stream slices that are emitted should refer to an existing cursor" ) - record_cursor = self._connector_state_converter.parse_value( - self._cursor_field.extract_value(record) + record_cursor = self._connector_state_converter.output_format( + self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) ) self._update_global_cursor(record_cursor) if not self._use_global_cursor: From 52b95e33d7782c163447a966754dc5156d7b555c Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 13 Feb 2025 12:02:37 +0200 Subject: [PATCH 13/19] Add throttling for state emitting in ConcurrentPerPartitionCursor --- .../concurrent_partition_cursor.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 4dc3a6341..2780218dc 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -5,6 +5,7 @@ import copy import logging import threading +import time from collections import OrderedDict from copy import deepcopy from datetime import timedelta @@ -59,7 +60,7 @@ class ConcurrentPerPartitionCursor(Cursor): """ DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 - SWITCH_TO_GLOBAL_LIMIT = 1000 + SWITCH_TO_GLOBAL_LIMIT = 10_000 _NO_STATE: Mapping[str, Any] = {} _NO_CURSOR_STATE: Mapping[str, Any] = {} _GLOBAL_STATE_KEY = "state" @@ -103,6 +104,8 @@ def __init__( self._number_of_partitions: int = 0 self._use_global_cursor: bool = False self._partition_serializer = PerPartitionKeySerializer() + # Track the last time a state message was emitted + self._last_emission_time: float = 0.0 self._set_initial_state(stream_state) @@ -166,9 +169,12 @@ def ensure_at_least_one_state_emitted(self) -> None: self._global_cursor = self._new_global_cursor self._lookback_window = self._timer.finish() self._parent_state = self._partition_router.get_stream_state() - self._emit_state_message() + self._emit_state_message(throttle=False) - def _emit_state_message(self) -> None: + def _emit_state_message(self, throttle: bool = True) -> None: + current_time = time.time() + if throttle and current_time - self._last_emission_time <= 60: + return self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, @@ -178,6 +184,7 @@ def _emit_state_message(self) -> None: self._stream_name, self._stream_namespace ) self._message_repository.emit_message(state_message) + self._last_emission_time = current_time def stream_slices(self) -> Iterable[StreamSlice]: if self._timer.is_running(): @@ -242,7 +249,7 @@ def _ensure_partition_limit(self) -> None: partition_key ) # Remove the oldest partition logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}." + f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." ) break else: @@ -251,7 +258,7 @@ def _ensure_partition_limit(self) -> None: 1 ] # Remove the oldest partition logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions}." + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." ) def _set_initial_state(self, stream_state: StreamState) -> None: @@ -372,7 +379,7 @@ def observe(self, record: Record) -> None: self._to_partition_key(record.associated_slice.partition) ].observe(record) - def _update_global_cursor(self, value: Mapping[str, Any]) -> None: + def _update_global_cursor(self, value: Any) -> None: if ( self._new_global_cursor is None or self._new_global_cursor[self.cursor_field.cursor_field_key] < value From 1166a7a2e68e0713fa191804a4cab844cfdf8c95 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 17 Feb 2025 13:41:05 +0200 Subject: [PATCH 14/19] Fix unit tests --- .../concurrent_partition_cursor.py | 19 ++++-- .../test_concurrent_perpartitioncursor.py | 66 ++++++++++++++++--- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 2780218dc..da12cc05d 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -171,10 +171,21 @@ def ensure_at_least_one_state_emitted(self) -> None: self._parent_state = self._partition_router.get_stream_state() self._emit_state_message(throttle=False) - def _emit_state_message(self, throttle: bool = True) -> None: + def _throttle_state_message(self) -> Optional[float]: + """ + Throttles the state message emission to once every 60 seconds. + """ current_time = time.time() - if throttle and current_time - self._last_emission_time <= 60: - return + if current_time - self._last_emission_time <= 60: + return None + return current_time + + def _emit_state_message(self, throttle: bool = True) -> None: + if throttle: + current_time = self._throttle_state_message() + if current_time is None: + return + self._last_emission_time = current_time self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, @@ -184,7 +195,6 @@ def _emit_state_message(self, throttle: bool = True) -> None: self._stream_name, self._stream_namespace ) self._message_repository.emit_message(state_message) - self._last_emission_time = current_time def stream_slices(self) -> Iterable[StreamSlice]: if self._timer.is_running(): @@ -358,6 +368,7 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: self._new_global_cursor = deepcopy(fixed_global_state) def observe(self, record: Record) -> None: + # ToDo: check number of partitions if not self._use_global_cursor and self.limit_reached(): logger.info( f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index ef06676f5..767d24874 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -3,6 +3,7 @@ from copy import deepcopy from datetime import datetime, timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Union +from unittest.mock import MagicMock, patch from urllib.parse import unquote import pytest @@ -18,6 +19,7 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) +from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read @@ -1181,14 +1183,18 @@ def test_incremental_parent_state( initial_state, expected_state, ): - run_incremental_parent_state_test( - manifest, - mock_requests, - expected_records, - num_intermediate_states, - initial_state, - [expected_state], - ) + # Patch `_throttle_state_message` so it always returns a float (indicating "no throttle") + with patch.object( + ConcurrentPerPartitionCursor, "_throttle_state_message", return_value=9999999.0 + ): + run_incremental_parent_state_test( + manifest, + mock_requests, + expected_records, + num_intermediate_states, + initial_state, + [expected_state], + ) STATE_MIGRATION_EXPECTED_STATE = { @@ -2967,3 +2973,47 @@ def test_incremental_substream_request_options_provider( expected_records, expected_state, ) + + +def test_state_throttling(mocker): + """ + Verifies that _emit_state_message does not emit a new state if less than 60s + have passed since last emission, and does emit once 60s or more have passed. + """ + cursor = ConcurrentPerPartitionCursor( + cursor_factory=MagicMock(), + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=MagicMock(), + cursor_field=MagicMock(), + ) + + mock_connector_manager = cursor._connector_state_manager + mock_repo = cursor._message_repository + + # Set the last emission time to "0" so we can control offset from that + cursor._last_emission_time = 0 + + mock_time = mocker.patch("time.time") + + # First attempt: only 10 seconds passed => NO emission + mock_time.return_value = 10 + cursor._emit_state_message() + mock_connector_manager.update_state_for_stream.assert_not_called() + mock_repo.emit_message.assert_not_called() + + # Second attempt: 30 seconds passed => still NO emission + mock_time.return_value = 30 + cursor._emit_state_message() + mock_connector_manager.update_state_for_stream.assert_not_called() + mock_repo.emit_message.assert_not_called() + + # Advance time: 70 seconds => exceed 60s => MUST emit + mock_time.return_value = 70 + cursor._emit_state_message() + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once() From 4a7d9eccb4421c6591835a4ab99e9e4cc22a276c Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 17 Feb 2025 14:05:10 +0200 Subject: [PATCH 15/19] Move switching to global logic --- .../concurrent_partition_cursor.py | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index da12cc05d..74d7f8893 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -59,8 +59,8 @@ class ConcurrentPerPartitionCursor(Cursor): CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. """ - DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 - SWITCH_TO_GLOBAL_LIMIT = 10_000 + DEFAULT_MAX_PARTITIONS_NUMBER = 200 + SWITCH_TO_GLOBAL_LIMIT = 100 _NO_STATE: Mapping[str, Any] = {} _NO_CURSOR_STATE: Mapping[str, Any] = {} _GLOBAL_STATE_KEY = "state" @@ -145,19 +145,19 @@ def close_partition(self, partition: Partition) -> None: raise ValueError("stream_slice cannot be None") partition_key = self._to_partition_key(stream_slice.partition) - if not self._use_global_cursor: - self._cursor_per_partition[partition_key].close_partition(partition=partition) with self._lock: - self._semaphore_per_partition[partition_key].acquire() - cursor = self._cursor_per_partition[partition_key] - if ( - partition_key in self._finished_partitions - and self._semaphore_per_partition[partition_key]._value == 0 - ): - self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) if not self._use_global_cursor: + self._cursor_per_partition[partition_key].close_partition(partition=partition) + cursor = self._cursor_per_partition[partition_key] + if ( + partition_key in self._finished_partitions + and self._semaphore_per_partition[partition_key]._value == 0 + ): + self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) self._emit_state_message() + self._semaphore_per_partition[partition_key].acquire() + def ensure_at_least_one_state_emitted(self) -> None: """ The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be @@ -246,6 +246,13 @@ def _ensure_partition_limit(self) -> None: - Logs a warning each time a partition is removed, indicating whether it was finished or removed due to being the oldest. """ + if not self._use_global_cursor and self.limit_reached(): + logger.info( + f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " + f"Switching to global cursor for {self._stream_name}." + ) + self._use_global_cursor = True + with self._lock: self._number_of_partitions += 1 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: @@ -368,14 +375,6 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: self._new_global_cursor = deepcopy(fixed_global_state) def observe(self, record: Record) -> None: - # ToDo: check number of partitions - if not self._use_global_cursor and self.limit_reached(): - logger.info( - f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " - f"Switching to global cursor for {self._stream_name}." - ) - self._use_global_cursor = True - if not record.associated_slice: raise ValueError( "Invalid state as stream slices that are emitted should refer to an existing cursor" From 19ad269f8f802f4701cc5b680eec282fd2183a99 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 17 Feb 2025 14:45:25 +0200 Subject: [PATCH 16/19] Revert test limits --- .../declarative/incremental/concurrent_partition_cursor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 74d7f8893..ed67e8166 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -59,8 +59,8 @@ class ConcurrentPerPartitionCursor(Cursor): CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. """ - DEFAULT_MAX_PARTITIONS_NUMBER = 200 - SWITCH_TO_GLOBAL_LIMIT = 100 + DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 + SWITCH_TO_GLOBAL_LIMIT = 10_000 _NO_STATE: Mapping[str, Any] = {} _NO_CURSOR_STATE: Mapping[str, Any] = {} _GLOBAL_STATE_KEY = "state" From 6498528eab15f1bf261df588201a72922d895634 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 17 Feb 2025 15:36:26 +0200 Subject: [PATCH 17/19] Fix format --- .../declarative/incremental/concurrent_partition_cursor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index ed67e8166..84d3cb6e2 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -150,8 +150,8 @@ def close_partition(self, partition: Partition) -> None: self._cursor_per_partition[partition_key].close_partition(partition=partition) cursor = self._cursor_per_partition[partition_key] if ( - partition_key in self._finished_partitions - and self._semaphore_per_partition[partition_key]._value == 0 + partition_key in self._finished_partitions + and self._semaphore_per_partition[partition_key]._value == 0 ): self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) self._emit_state_message() From 7b4964edb72b800855114215d710387a529e232c Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 17 Feb 2025 18:14:19 +0200 Subject: [PATCH 18/19] Move acquiring the semaphore --- .../declarative/incremental/concurrent_partition_cursor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 84d3cb6e2..efa5996b3 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -146,6 +146,7 @@ def close_partition(self, partition: Partition) -> None: partition_key = self._to_partition_key(stream_slice.partition) with self._lock: + self._semaphore_per_partition[partition_key].acquire() if not self._use_global_cursor: self._cursor_per_partition[partition_key].close_partition(partition=partition) cursor = self._cursor_per_partition[partition_key] @@ -156,8 +157,6 @@ def close_partition(self, partition: Partition) -> None: self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) self._emit_state_message() - self._semaphore_per_partition[partition_key].acquire() - def ensure_at_least_one_state_emitted(self) -> None: """ The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be From ef8be891afb5f56d8a58d557f0287df8c264d9fb Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 17 Feb 2025 20:30:55 +0200 Subject: [PATCH 19/19] Fix partitions count --- .../declarative/incremental/concurrent_partition_cursor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index efa5996b3..ddcba0470 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -215,6 +215,7 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St self._lookback_window if self._global_cursor else 0, ) with self._lock: + self._number_of_partitions += 1 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( threading.Semaphore(0) @@ -253,7 +254,6 @@ def _ensure_partition_limit(self) -> None: self._use_global_cursor = True with self._lock: - self._number_of_partitions += 1 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: # Try removing finished partitions first for partition_key in list(self._cursor_per_partition.keys()): @@ -334,6 +334,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None: self._lookback_window = int(stream_state.get("lookback_window", 0)) for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): + self._number_of_partitions += 1 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( self._create_cursor(state["cursor"]) )