Skip to content

Commit c8b4cfc

Browse files
committed
pass only count to ModelToComponentFactory
1 parent eece374 commit c8b4cfc

File tree

2 files changed

+4
-13
lines changed

2 files changed

+4
-13
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ def __init__(
9494
component_factory
9595
if component_factory
9696
else ModelToComponentFactory(
97-
emit_connector_builder_messages, source_config=source_config
97+
emit_connector_builder_messages,
98+
max_concurrent_async_job_count=source_config.get("max_concurrent_job_count"),
9899
)
99100
)
100101
self._message_repository = self._constructor.get_message_repository()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+2-12
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ def __init__(
527527
disable_resumable_full_refresh: bool = False,
528528
message_repository: Optional[MessageRepository] = None,
529529
connector_state_manager: Optional[ConnectorStateManager] = None,
530-
source_config: Optional[ConnectionDefinition] = None,
530+
max_concurrent_async_job_count: Optional[int] = None,
531531
):
532532
self._init_mappings()
533533
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -541,7 +541,7 @@ def __init__(
541541
)
542542
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
543543
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
544-
self._job_tracker: JobTracker = self._create_async_job_tracker(source_config=source_config)
544+
self._job_tracker = JobTracker(max_concurrent_async_job_count or 1)
545545

546546
def _init_mappings(self) -> None:
547547
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
@@ -3219,13 +3219,3 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
32193219
self._api_budget = self.create_component(
32203220
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
32213221
)
3222-
3223-
def _create_async_job_tracker(
3224-
self, source_config: Optional[ConnectionDefinition]
3225-
) -> JobTracker:
3226-
"""
3227-
Sets up job tracking for async jobs based on limit specified in the source config.
3228-
"""
3229-
if source_config:
3230-
return JobTracker(source_config.get("max_concurrent_job_count", 1))
3231-
return JobTracker(1)

0 commit comments

Comments
 (0)