Skip to content

Commit

Permalink
Completed D400 for airflow/listener/* directory (#27731)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdsoha authored Nov 17, 2022
1 parent 325b886 commit 2631877
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
4 changes: 4 additions & 0 deletions airflow/listeners/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

def on_task_instance_state_session_flush(session, flush_context):
"""
Flush task instance's state.
Listens for session.flush() events that modify TaskInstance's state, and notify listeners that listen
for that event. Doing it this way enable us to be stateless in the SQLAlchemy event listener.
"""
Expand Down Expand Up @@ -69,13 +71,15 @@ def on_task_instance_state_session_flush(session, flush_context):


def register_task_instance_state_events():
"""Register a task instance state event"""
global _is_listening
if not _is_listening:
event.listen(Session, "after_flush", on_task_instance_state_session_flush)
_is_listening = True


def unregister_task_instance_state_events():
"""Unregister a task instance state event"""
global _is_listening
event.remove(Session, "after_flush", on_task_instance_state_session_flush)
_is_listening = False
7 changes: 4 additions & 3 deletions airflow/listeners/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


class ListenerManager:
"""Class that manages registration of listeners and provides hook property for calling them"""
"""Manage listener registration and provides hook property for calling them."""

def __init__(self):
from airflow.listeners import spec
Expand All @@ -49,7 +49,7 @@ def has_listeners(self) -> bool:

@property
def hook(self) -> _HookRelay:
"""Returns hook, on which plugin methods specified in spec can be called."""
"""Return hook, on which plugin methods specified in spec can be called."""
return self.pm.hook

def add_listener(self, listener):
Expand All @@ -60,12 +60,13 @@ def add_listener(self, listener):
self.pm.register(listener)

def clear(self):
"""Remove registered plugins"""
"""Remove registered plugins."""
for plugin in self.pm.get_plugins():
self.pm.unregister(plugin)


def get_listener_manager() -> ListenerManager:
"""Get singleton listener manager"""
global _listener_manager
if not _listener_manager:
_listener_manager = ListenerManager()
Expand Down

0 comments on commit 2631877

Please sign in to comment.