-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathjob_tracker.py
75 lines (62 loc) · 2.29 KB
/
job_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
import logging
import threading
import uuid
from typing import Set
from airbyte_cdk.logger import lazy_log
LOGGER = logging.getLogger("airbyte")
class ConcurrentJobLimitReached(Exception):
pass
class JobTracker:
def __init__(self, limit: int):
self._jobs: Set[str] = set()
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()
def try_to_get_intent(self) -> str:
lazy_log(
LOGGER,
logging.DEBUG,
lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...",
)
with self._lock:
if self._has_reached_limit():
raise ConcurrentJobLimitReached(
"Can't allocate more jobs right now: limit already reached"
)
intent = f"intent_{str(uuid.uuid4())}"
lazy_log(
LOGGER,
logging.DEBUG,
lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!",
)
self._jobs.add(intent)
return intent
def add_job(self, intent_or_job_id: str, job_id: str) -> None:
if intent_or_job_id not in self._jobs:
raise ValueError(
f"Can't add job: Unknown intent or job id, known values are {self._jobs}"
)
if intent_or_job_id == job_id:
# Nothing to do here as the ID to replace is the same
return
lazy_log(
LOGGER,
logging.DEBUG,
lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!",
)
with self._lock:
self._jobs.add(job_id)
self._jobs.remove(intent_or_job_id)
def remove_job(self, job_id: str) -> None:
"""
If the job is not allocated as a running job, this method does nothing and it won't raise.
"""
lazy_log(
LOGGER,
logging.DEBUG,
lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}",
)
with self._lock:
self._jobs.discard(job_id)
def _has_reached_limit(self) -> bool:
return len(self._jobs) >= self._limit