Skip to content

Commit

Permalink
Edge worker graceful shutdown on version mismatch (#43462)
Browse files Browse the repository at this point in the history
* React on EdgeWorkerVersionException

* update version number

* import EdgeWorkerVersionException

* edge worker exception is thrown later

* fix missing variable

* Add pytests

* changelog added

* fix static checks

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <donat.majoros2@hu.bosch.com>
  • Loading branch information
majorosdonat and Majoros Donat (XC-DX/EET2-Bp) authored Oct 29, 2024
1 parent e485bd6 commit 96aae97
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 7 deletions.
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.5.0pre0
.........

Misc
~~~~

* ``Edge worker triggers graceful shutdown, if worker version and main instance do not match.``

0.4.0pre0
.........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.4.0pre0"
__version__ = "0.5.0pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
16 changes: 13 additions & 3 deletions providers/src/airflow/providers/edge/cli/edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.providers.edge import __version__ as edge_provider_version
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorker, EdgeWorkerState
from airflow.providers.edge.models.edge_worker import EdgeWorker, EdgeWorkerState, EdgeWorkerVersionException
from airflow.utils import cli as cli_utils
from airflow.utils.platform import IS_WINDOWS
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand Down Expand Up @@ -175,6 +175,9 @@ def start(self):
self.last_hb = EdgeWorker.register_worker(
self.hostname, EdgeWorkerState.STARTING, self.queues, self._get_sysinfo()
).last_update
except EdgeWorkerVersionException as e:
logger.info("Version mismatch of Edge worker and Core. Shutting down worker.")
raise SystemExit(str(e))
except AirflowException as e:
if "404:NOT FOUND" in str(e):
raise SystemExit("Error: API endpoint is not ready, please set [edge] api_enabled=True.")
Expand All @@ -186,7 +189,10 @@ def start(self):
self.loop()

logger.info("Quitting worker, signal being offline.")
EdgeWorker.set_state(self.hostname, EdgeWorkerState.OFFLINE, 0, self._get_sysinfo())
try:
EdgeWorker.set_state(self.hostname, EdgeWorkerState.OFFLINE, 0, self._get_sysinfo())
except EdgeWorkerVersionException:
logger.info("Version mismatch of Edge worker and Core. Quitting worker anyway.")
finally:
remove_existing_pidfile(self.pid_file_path)

Expand Down Expand Up @@ -259,7 +265,11 @@ def heartbeat(self) -> None:
else EdgeWorkerState.IDLE
)
sysinfo = self._get_sysinfo()
self.queues = EdgeWorker.set_state(self.hostname, state, len(self.jobs), sysinfo)
try:
self.queues = EdgeWorker.set_state(self.hostname, state, len(self.jobs), sysinfo)
except EdgeWorkerVersionException:
logger.info("Version mismatch of Edge worker and Core. Shutting down worker.")
_EdgeWorkerCli.drain = True

def interruptible_sleep(self):
"""Sleeps but stops sleeping if drain is made."""
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/models/edge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ def set_state(
session: Session = NEW_SESSION,
) -> list[str] | None:
"""Set state of worker and returns the current assigned queues."""
EdgeWorker.assert_version(sysinfo)
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
worker: EdgeWorkerModel = session.scalar(query)
worker.state = state
Expand All @@ -283,6 +282,7 @@ def set_state(
concurrency=int(sysinfo["concurrency"]),
queues=worker.queues,
)
EdgeWorker.assert_version(sysinfo) # Exception only after worker state is in the DB
return worker.queues

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.4.0pre0
- 0.5.0pre0

dependencies:
- apache-airflow>=2.10.0
Expand Down
8 changes: 7 additions & 1 deletion providers/tests/edge/cli/test_edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airflow.exceptions import AirflowException
from airflow.providers.edge.cli.edge_command import _EdgeWorkerCli, _Job, _write_pid_to_pidfile
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_worker import EdgeWorker, EdgeWorkerState
from airflow.providers.edge.models.edge_worker import EdgeWorker, EdgeWorkerState, EdgeWorkerVersionException
from airflow.utils.state import TaskInstanceState

from tests_common.test_utils.config import conf_vars
Expand Down Expand Up @@ -266,6 +266,12 @@ def test_heartbeat(self, mock_set_state, drain, jobs, expected_state, worker_wit
assert "queue1" in (queue_list)
assert "queue2" in (queue_list)

@patch("airflow.providers.edge.models.edge_worker.EdgeWorker.set_state")
def test_version_mismatch(self, mock_set_state, worker_with_job):
mock_set_state.side_effect = EdgeWorkerVersionException("")
worker_with_job.heartbeat()
assert worker_with_job.drain

@patch("airflow.providers.edge.models.edge_worker.EdgeWorker.register_worker")
def test_start_missing_apiserver(self, mock_register_worker, worker_with_job: _EdgeWorkerCli):
mock_register_worker.side_effect = AirflowException(
Expand Down

0 comments on commit 96aae97

Please sign in to comment.