Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable JobTracker limit configurability #405

Merged
merged 14 commits into from
Mar 12, 2025
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ConcurrentJobLimitReached(Exception):
class JobTracker:
def __init__(self, limit: int):
self._jobs: Set[str] = set()
self._limit = limit
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()

def try_to_get_intent(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
max_concurrent_job_count:
title: Maximum Concurrent Async Jobs
description: Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.
type: integer
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def __init__(
self._constructor = (
component_factory
if component_factory
else ModelToComponentFactory(emit_connector_builder_messages)
else ModelToComponentFactory(
emit_connector_builder_messages, source_config=source_config
)
)
self._message_repository = self._constructor.get_message_repository()
self._slice_logger: SliceLogger = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down Expand Up @@ -1898,6 +1903,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand All @@ -527,6 +527,7 @@ def __init__(
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
source_config: Optional[ConnectionDefinition] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -540,6 +541,11 @@ def __init__(
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: Optional[JobTracker] = (
self._create_async_job_tracker(source_config=source_config)
if source_config
else None
)

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -2924,12 +2930,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
download_target_extractor=download_target_extractor,
)

self._job_tracker = JobTracker(1) if not self._job_tracker else self._job_tracker

async_job_partition_router = AsyncJobPartitionRouter(
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
self._job_tracker,
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
Expand Down Expand Up @@ -3218,3 +3225,13 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
self._api_budget = self.create_component(
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
)

def _create_async_job_tracker(
self, source_config: ConnectionDefinition
) -> Optional[JobTracker]:
"""
Sets up job tracking for async jobs based on limit specified in the source config.
"""
if job_count := source_config.get("max_concurrent_job_count"):
return JobTracker(job_count)
return None
Loading