Skip to content

Commit 545e2ac

Browse files
authored
feat: Enable JobTracker limit configurability (#405)
1 parent f0a57f1 commit 545e2ac

File tree

6 files changed

+33
-5
lines changed

6 files changed

+33
-5
lines changed

airbyte_cdk/sources/declarative/async_job/job_tracker.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ class ConcurrentJobLimitReached(Exception):
1717
class JobTracker:
1818
def __init__(self, limit: int):
1919
self._jobs: Set[str] = set()
20-
self._limit = limit
20+
if limit < 1:
21+
LOGGER.warning(
22+
f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value."
23+
)
24+
self._limit = 1 if limit < 1 else limit
2125
self._lock = threading.Lock()
2226

2327
def try_to_get_intent(self) -> str:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ properties:
4242
"$ref": "#/definitions/ConcurrencyLevel"
4343
api_budget:
4444
"$ref": "#/definitions/HTTPAPIBudget"
45+
max_concurrent_async_job_count:
46+
title: Maximum Concurrent Asynchronous Jobs
47+
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
48+
type: integer
4549
metadata:
4650
type: object
4751
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ def __init__(
9393
self._constructor = (
9494
component_factory
9595
if component_factory
96-
else ModelToComponentFactory(emit_connector_builder_messages)
96+
else ModelToComponentFactory(
97+
emit_connector_builder_messages,
98+
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
99+
)
97100
)
98101
self._message_repository = self._constructor.get_message_repository()
99102
self._slice_logger: SliceLogger = (

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+10
Original file line numberDiff line numberDiff line change
@@ -1871,6 +1871,11 @@ class Config:
18711871
spec: Optional[Spec] = None
18721872
concurrency_level: Optional[ConcurrencyLevel] = None
18731873
api_budget: Optional[HTTPAPIBudget] = None
1874+
max_concurrent_async_job_count: Optional[int] = Field(
1875+
None,
1876+
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1877+
title="Maximum Concurrent Asynchronous Jobs",
1878+
)
18741879
metadata: Optional[Dict[str, Any]] = Field(
18751880
None,
18761881
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
@@ -1898,6 +1903,11 @@ class Config:
18981903
spec: Optional[Spec] = None
18991904
concurrency_level: Optional[ConcurrencyLevel] = None
19001905
api_budget: Optional[HTTPAPIBudget] = None
1906+
max_concurrent_async_job_count: Optional[int] = Field(
1907+
None,
1908+
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1909+
title="Maximum Concurrent Asynchronous Jobs",
1910+
)
19011911
metadata: Optional[Dict[str, Any]] = Field(
19021912
None,
19031913
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@
503503
IncrementingCountStreamStateConverter,
504504
)
505505
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
506-
from airbyte_cdk.sources.types import Config
506+
from airbyte_cdk.sources.types import Config, ConnectionDefinition
507507
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
508508

509509
ComponentDefinition = Mapping[str, Any]
@@ -527,6 +527,7 @@ def __init__(
527527
disable_resumable_full_refresh: bool = False,
528528
message_repository: Optional[MessageRepository] = None,
529529
connector_state_manager: Optional[ConnectorStateManager] = None,
530+
max_concurrent_async_job_count: Optional[int] = None,
530531
):
531532
self._init_mappings()
532533
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -540,6 +541,7 @@ def __init__(
540541
)
541542
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
542543
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
544+
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
543545

544546
def _init_mappings(self) -> None:
545547
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
@@ -2928,8 +2930,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
29282930
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
29292931
job_repository,
29302932
stream_slices,
2931-
JobTracker(1),
2932-
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
2933+
self._job_tracker,
29332934
self._message_repository,
29342935
has_bulk_parent=False,
29352936
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk

unit_tests/sources/declarative/async_job/test_job_tracker.py

+6
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,9 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N
3939

4040
def _reach_limit(self) -> List[str]:
4141
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]
42+
43+
44+
@pytest.mark.parametrize("limit", [-1, 0])
45+
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
46+
tracker = JobTracker(limit)
47+
assert tracker._limit == 1

0 commit comments

Comments
 (0)