Skip to content

Commit c80cad1

Browse files
authored
Refactor the Celery Beat integration (#3105)
1 parent 4e2af01 commit c80cad1

File tree

4 files changed

+224
-134
lines changed

4 files changed

+224
-134
lines changed

sentry_sdk/integrations/celery/__init__.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,9 @@ def __init__(
7070
self.monitor_beat_tasks = monitor_beat_tasks
7171
self.exclude_beat_tasks = exclude_beat_tasks
7272

73-
if monitor_beat_tasks:
74-
_patch_beat_apply_entry()
75-
_patch_redbeat_maybe_due()
76-
_setup_celery_beat_signals()
73+
_patch_beat_apply_entry()
74+
_patch_redbeat_maybe_due()
75+
_setup_celery_beat_signals()
7776

7877
@staticmethod
7978
def setup_once():
@@ -167,11 +166,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
167166
"""
168167
updated_headers = original_headers.copy()
169168
with capture_internal_exceptions():
170-
headers = {}
171-
if span is not None:
172-
headers = dict(
173-
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
174-
)
169+
# if span is None (when the task was started by Celery Beat)
170+
# this will return the trace headers from the scope.
171+
headers = dict(
172+
Scope.get_isolation_scope().iter_trace_propagation_headers(span=span)
173+
)
175174

176175
if monitor_beat_tasks:
177176
headers.update(

sentry_sdk/integrations/celery/beat.py

+71-95
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from functools import wraps
12
import sentry_sdk
23
from sentry_sdk.crons import capture_checkin, MonitorStatus
34
from sentry_sdk.integrations import DidNotEnable
@@ -113,133 +114,108 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
113114
return monitor_config
114115

115116

116-
def _patch_beat_apply_entry():
117-
# type: () -> None
117+
def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
118+
# type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
118119
"""
119-
Makes sure that the Sentry Crons information is set in the Celery Beat task's
120-
headers so that is is monitored with Sentry Crons.
121-
122-
This is only called by Celery Beat. After apply_entry is called
123-
Celery will call apply_async to put the task in the queue.
120+
Add Sentry Crons information to the schedule_entry headers.
124121
"""
125-
from sentry_sdk.integrations.celery import CeleryIntegration
126-
127-
original_apply_entry = Scheduler.apply_entry
128-
129-
def sentry_apply_entry(*args, **kwargs):
130-
# type: (*Any, **Any) -> None
131-
scheduler, schedule_entry = args
132-
app = scheduler.app
133-
134-
celery_schedule = schedule_entry.schedule
135-
monitor_name = schedule_entry.name
136-
137-
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
138-
if integration is None:
139-
return original_apply_entry(*args, **kwargs)
122+
if not integration.monitor_beat_tasks:
123+
return
140124

141-
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
142-
return original_apply_entry(*args, **kwargs)
125+
monitor_name = schedule_entry.name
143126

144-
# Tasks started by Celery Beat start a new Trace
145-
scope = Scope.get_isolation_scope()
146-
scope.set_new_propagation_context()
147-
scope._name = "celery-beat"
127+
task_should_be_excluded = match_regex_list(
128+
monitor_name, integration.exclude_beat_tasks
129+
)
130+
if task_should_be_excluded:
131+
return
148132

149-
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
133+
celery_schedule = schedule_entry.schedule
134+
app = scheduler.app
150135

151-
is_supported_schedule = bool(monitor_config)
152-
if is_supported_schedule:
153-
headers = schedule_entry.options.pop("headers", {})
154-
headers.update(
155-
{
156-
"sentry-monitor-slug": monitor_name,
157-
"sentry-monitor-config": monitor_config,
158-
}
159-
)
136+
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
160137

161-
check_in_id = capture_checkin(
162-
monitor_slug=monitor_name,
163-
monitor_config=monitor_config,
164-
status=MonitorStatus.IN_PROGRESS,
165-
)
166-
headers.update({"sentry-monitor-check-in-id": check_in_id})
138+
is_supported_schedule = bool(monitor_config)
139+
if not is_supported_schedule:
140+
return
167141

168-
# Set the Sentry configuration in the options of the ScheduleEntry.
169-
# Those will be picked up in `apply_async` and added to the headers.
170-
schedule_entry.options["headers"] = headers
142+
headers = schedule_entry.options.pop("headers", {})
143+
headers.update(
144+
{
145+
"sentry-monitor-slug": monitor_name,
146+
"sentry-monitor-config": monitor_config,
147+
}
148+
)
171149

172-
return original_apply_entry(*args, **kwargs)
150+
check_in_id = capture_checkin(
151+
monitor_slug=monitor_name,
152+
monitor_config=monitor_config,
153+
status=MonitorStatus.IN_PROGRESS,
154+
)
155+
headers.update({"sentry-monitor-check-in-id": check_in_id})
173156

174-
Scheduler.apply_entry = sentry_apply_entry
157+
# Set the Sentry configuration in the options of the ScheduleEntry.
158+
# Those will be picked up in `apply_async` and added to the headers.
159+
schedule_entry.options["headers"] = headers
175160

176161

177-
def _patch_redbeat_maybe_due():
178-
# type: () -> None
179-
180-
if RedBeatScheduler is None:
181-
return
182-
162+
def _wrap_beat_scheduler(f):
163+
# type: (Callable[..., Any]) -> Callable[..., Any]
164+
"""
165+
Makes sure that:
166+
- a new Sentry trace is started for each task started by Celery Beat and
167+
it is propagated to the task.
168+
- the Sentry Crons information is set in the Celery Beat task's
169+
headers so that is is monitored with Sentry Crons.
170+
171+
After the patched function is called,
172+
Celery Beat will call apply_async to put the task in the queue.
173+
"""
183174
from sentry_sdk.integrations.celery import CeleryIntegration
184175

185-
original_maybe_due = RedBeatScheduler.maybe_due
186-
187-
def sentry_maybe_due(*args, **kwargs):
176+
@wraps(f)
177+
def sentry_patched_scheduler(*args, **kwargs):
188178
# type: (*Any, **Any) -> None
189-
scheduler, schedule_entry = args
190-
app = scheduler.app
191-
192-
celery_schedule = schedule_entry.schedule
193-
monitor_name = schedule_entry.name
194-
195179
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
196180
if integration is None:
197-
return original_maybe_due(*args, **kwargs)
198-
199-
task_should_be_excluded = match_regex_list(
200-
monitor_name, integration.exclude_beat_tasks
201-
)
202-
if task_should_be_excluded:
203-
return original_maybe_due(*args, **kwargs)
181+
return f(*args, **kwargs)
204182

205183
# Tasks started by Celery Beat start a new Trace
206184
scope = Scope.get_isolation_scope()
207185
scope.set_new_propagation_context()
208186
scope._name = "celery-beat"
209187

210-
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
188+
scheduler, schedule_entry = args
189+
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
190+
191+
return f(*args, **kwargs)
211192

212-
is_supported_schedule = bool(monitor_config)
213-
if is_supported_schedule:
214-
headers = schedule_entry.options.pop("headers", {})
215-
headers.update(
216-
{
217-
"sentry-monitor-slug": monitor_name,
218-
"sentry-monitor-config": monitor_config,
219-
}
220-
)
193+
return sentry_patched_scheduler
221194

222-
check_in_id = capture_checkin(
223-
monitor_slug=monitor_name,
224-
monitor_config=monitor_config,
225-
status=MonitorStatus.IN_PROGRESS,
226-
)
227-
headers.update({"sentry-monitor-check-in-id": check_in_id})
228195

229-
# Set the Sentry configuration in the options of the ScheduleEntry.
230-
# Those will be picked up in `apply_async` and added to the headers.
231-
schedule_entry.options["headers"] = headers
196+
def _patch_beat_apply_entry():
197+
# type: () -> None
198+
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
232199

233-
return original_maybe_due(*args, **kwargs)
234200

235-
RedBeatScheduler.maybe_due = sentry_maybe_due
201+
def _patch_redbeat_maybe_due():
202+
# type: () -> None
203+
if RedBeatScheduler is None:
204+
return
205+
206+
RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)
236207

237208

238209
def _setup_celery_beat_signals():
239210
# type: () -> None
240-
task_success.connect(crons_task_success)
241-
task_failure.connect(crons_task_failure)
242-
task_retry.connect(crons_task_retry)
211+
from sentry_sdk.integrations.celery import CeleryIntegration
212+
213+
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
214+
215+
if integration is not None and integration.monitor_beat_tasks:
216+
task_success.connect(crons_task_success)
217+
task_failure.connect(crons_task_failure)
218+
task_retry.connect(crons_task_retry)
243219

244220

245221
def crons_task_success(sender, **kwargs):

sentry_sdk/scope.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -604,9 +604,10 @@ def iter_headers(self):
604604
def iter_trace_propagation_headers(self, *args, **kwargs):
605605
# type: (Any, Any) -> Generator[Tuple[str, str], None, None]
606606
"""
607-
Return HTTP headers which allow propagation of trace data. Data taken
608-
from the span representing the request, if available, or the current
609-
span on the scope if not.
607+
Return HTTP headers which allow propagation of trace data.
608+
609+
If a span is given, the trace data will taken from the span.
610+
If no span is given, the trace data is taken from the scope.
610611
"""
611612
client = Scope.get_client()
612613
if not client.options.get("propagate_traces"):

0 commit comments

Comments
 (0)