@@ -541,11 +541,7 @@ def __init__(
541
541
)
542
542
self ._connector_state_manager = connector_state_manager or ConnectorStateManager ()
543
543
self ._api_budget : Optional [Union [APIBudget , HttpAPIBudget ]] = None
544
- self ._job_tracker : Optional [JobTracker ] = (
545
- self ._create_async_job_tracker (source_config = source_config )
546
- if source_config
547
- else None
548
- )
544
+ self ._job_tracker : JobTracker = self ._create_async_job_tracker (source_config = source_config )
549
545
550
546
def _init_mappings (self ) -> None :
551
547
self .PYDANTIC_MODEL_TO_CONSTRUCTOR : Mapping [Type [BaseModel ], Callable [..., Any ]] = {
@@ -2930,8 +2926,6 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
2930
2926
download_target_extractor = download_target_extractor ,
2931
2927
)
2932
2928
2933
- self ._job_tracker = JobTracker (1 ) if not self ._job_tracker else self ._job_tracker
2934
-
2935
2929
async_job_partition_router = AsyncJobPartitionRouter (
2936
2930
job_orchestrator_factory = lambda stream_slices : AsyncJobOrchestrator (
2937
2931
job_repository ,
@@ -3227,11 +3221,11 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
3227
3221
)
3228
3222
3229
3223
def _create_async_job_tracker (
3230
- self , source_config : ConnectionDefinition
3231
- ) -> Optional [ JobTracker ] :
3224
+ self , source_config : Optional [ ConnectionDefinition ]
3225
+ ) -> JobTracker :
3232
3226
"""
3233
3227
Sets up job tracking for async jobs based on limit specified in the source config.
3234
3228
"""
3235
- if job_count := source_config . get ( "max_concurrent_job_count" ) :
3236
- return JobTracker (job_count )
3237
- return None
3229
+ if source_config :
3230
+ return JobTracker (source_config . get ( "max_concurrent_job_count" , 1 ) )
3231
+ return JobTracker ( 1 )
0 commit comments