Skip to content

Commit d818e8f

Browse files
Revert "Refactor the Celery Beat integration (#3105)" (#3144)
This reverts commit c80cad1, which appears to have introduced a regression preventing checkins from being sent when a cron job is finished.
1 parent c2af1b0 commit d818e8f

File tree

4 files changed

+134
-224
lines changed

4 files changed

+134
-224
lines changed

sentry_sdk/integrations/celery/__init__.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ def __init__(
7070
self.monitor_beat_tasks = monitor_beat_tasks
7171
self.exclude_beat_tasks = exclude_beat_tasks
7272

73-
_patch_beat_apply_entry()
74-
_patch_redbeat_maybe_due()
75-
_setup_celery_beat_signals()
73+
if monitor_beat_tasks:
74+
_patch_beat_apply_entry()
75+
_patch_redbeat_maybe_due()
76+
_setup_celery_beat_signals()
7677

7778
@staticmethod
7879
def setup_once():
@@ -166,11 +167,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
166167
"""
167168
updated_headers = original_headers.copy()
168169
with capture_internal_exceptions():
169-
# if span is None (when the task was started by Celery Beat)
170-
# this will return the trace headers from the scope.
171-
headers = dict(
172-
Scope.get_isolation_scope().iter_trace_propagation_headers(span=span)
173-
)
170+
headers = {}
171+
if span is not None:
172+
headers = dict(
173+
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
174+
)
174175

175176
if monitor_beat_tasks:
176177
headers.update(

sentry_sdk/integrations/celery/beat.py

+95-71
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from functools import wraps
21
import sentry_sdk
32
from sentry_sdk.crons import capture_checkin, MonitorStatus
43
from sentry_sdk.integrations import DidNotEnable
@@ -114,108 +113,133 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
114113
return monitor_config
115114

116115

117-
def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
118-
# type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
116+
def _patch_beat_apply_entry():
117+
# type: () -> None
119118
"""
120-
Add Sentry Crons information to the schedule_entry headers.
119+
Makes sure that the Sentry Crons information is set in the Celery Beat task's
120+
headers so that is is monitored with Sentry Crons.
121+
122+
This is only called by Celery Beat. After apply_entry is called
123+
Celery will call apply_async to put the task in the queue.
121124
"""
122-
if not integration.monitor_beat_tasks:
123-
return
125+
from sentry_sdk.integrations.celery import CeleryIntegration
124126

125-
monitor_name = schedule_entry.name
127+
original_apply_entry = Scheduler.apply_entry
126128

127-
task_should_be_excluded = match_regex_list(
128-
monitor_name, integration.exclude_beat_tasks
129-
)
130-
if task_should_be_excluded:
131-
return
129+
def sentry_apply_entry(*args, **kwargs):
130+
# type: (*Any, **Any) -> None
131+
scheduler, schedule_entry = args
132+
app = scheduler.app
132133

133-
celery_schedule = schedule_entry.schedule
134-
app = scheduler.app
134+
celery_schedule = schedule_entry.schedule
135+
monitor_name = schedule_entry.name
135136

136-
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
137+
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
138+
if integration is None:
139+
return original_apply_entry(*args, **kwargs)
137140

138-
is_supported_schedule = bool(monitor_config)
139-
if not is_supported_schedule:
140-
return
141+
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
142+
return original_apply_entry(*args, **kwargs)
141143

142-
headers = schedule_entry.options.pop("headers", {})
143-
headers.update(
144-
{
145-
"sentry-monitor-slug": monitor_name,
146-
"sentry-monitor-config": monitor_config,
147-
}
148-
)
144+
# Tasks started by Celery Beat start a new Trace
145+
scope = Scope.get_isolation_scope()
146+
scope.set_new_propagation_context()
147+
scope._name = "celery-beat"
149148

150-
check_in_id = capture_checkin(
151-
monitor_slug=monitor_name,
152-
monitor_config=monitor_config,
153-
status=MonitorStatus.IN_PROGRESS,
154-
)
155-
headers.update({"sentry-monitor-check-in-id": check_in_id})
149+
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
156150

157-
# Set the Sentry configuration in the options of the ScheduleEntry.
158-
# Those will be picked up in `apply_async` and added to the headers.
159-
schedule_entry.options["headers"] = headers
151+
is_supported_schedule = bool(monitor_config)
152+
if is_supported_schedule:
153+
headers = schedule_entry.options.pop("headers", {})
154+
headers.update(
155+
{
156+
"sentry-monitor-slug": monitor_name,
157+
"sentry-monitor-config": monitor_config,
158+
}
159+
)
160160

161+
check_in_id = capture_checkin(
162+
monitor_slug=monitor_name,
163+
monitor_config=monitor_config,
164+
status=MonitorStatus.IN_PROGRESS,
165+
)
166+
headers.update({"sentry-monitor-check-in-id": check_in_id})
167+
168+
# Set the Sentry configuration in the options of the ScheduleEntry.
169+
# Those will be picked up in `apply_async` and added to the headers.
170+
schedule_entry.options["headers"] = headers
171+
172+
return original_apply_entry(*args, **kwargs)
173+
174+
Scheduler.apply_entry = sentry_apply_entry
175+
176+
177+
def _patch_redbeat_maybe_due():
178+
# type: () -> None
179+
180+
if RedBeatScheduler is None:
181+
return
161182

162-
def _wrap_beat_scheduler(f):
163-
# type: (Callable[..., Any]) -> Callable[..., Any]
164-
"""
165-
Makes sure that:
166-
- a new Sentry trace is started for each task started by Celery Beat and
167-
it is propagated to the task.
168-
- the Sentry Crons information is set in the Celery Beat task's
169-
headers so that is is monitored with Sentry Crons.
170-
171-
After the patched function is called,
172-
Celery Beat will call apply_async to put the task in the queue.
173-
"""
174183
from sentry_sdk.integrations.celery import CeleryIntegration
175184

176-
@wraps(f)
177-
def sentry_patched_scheduler(*args, **kwargs):
185+
original_maybe_due = RedBeatScheduler.maybe_due
186+
187+
def sentry_maybe_due(*args, **kwargs):
178188
# type: (*Any, **Any) -> None
189+
scheduler, schedule_entry = args
190+
app = scheduler.app
191+
192+
celery_schedule = schedule_entry.schedule
193+
monitor_name = schedule_entry.name
194+
179195
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
180196
if integration is None:
181-
return f(*args, **kwargs)
197+
return original_maybe_due(*args, **kwargs)
198+
199+
task_should_be_excluded = match_regex_list(
200+
monitor_name, integration.exclude_beat_tasks
201+
)
202+
if task_should_be_excluded:
203+
return original_maybe_due(*args, **kwargs)
182204

183205
# Tasks started by Celery Beat start a new Trace
184206
scope = Scope.get_isolation_scope()
185207
scope.set_new_propagation_context()
186208
scope._name = "celery-beat"
187209

188-
scheduler, schedule_entry = args
189-
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
190-
191-
return f(*args, **kwargs)
210+
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
192211

193-
return sentry_patched_scheduler
212+
is_supported_schedule = bool(monitor_config)
213+
if is_supported_schedule:
214+
headers = schedule_entry.options.pop("headers", {})
215+
headers.update(
216+
{
217+
"sentry-monitor-slug": monitor_name,
218+
"sentry-monitor-config": monitor_config,
219+
}
220+
)
194221

222+
check_in_id = capture_checkin(
223+
monitor_slug=monitor_name,
224+
monitor_config=monitor_config,
225+
status=MonitorStatus.IN_PROGRESS,
226+
)
227+
headers.update({"sentry-monitor-check-in-id": check_in_id})
195228

196-
def _patch_beat_apply_entry():
197-
# type: () -> None
198-
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
229+
# Set the Sentry configuration in the options of the ScheduleEntry.
230+
# Those will be picked up in `apply_async` and added to the headers.
231+
schedule_entry.options["headers"] = headers
199232

233+
return original_maybe_due(*args, **kwargs)
200234

201-
def _patch_redbeat_maybe_due():
202-
# type: () -> None
203-
if RedBeatScheduler is None:
204-
return
205-
206-
RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)
235+
RedBeatScheduler.maybe_due = sentry_maybe_due
207236

208237

209238
def _setup_celery_beat_signals():
210239
# type: () -> None
211-
from sentry_sdk.integrations.celery import CeleryIntegration
212-
213-
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
214-
215-
if integration is not None and integration.monitor_beat_tasks:
216-
task_success.connect(crons_task_success)
217-
task_failure.connect(crons_task_failure)
218-
task_retry.connect(crons_task_retry)
240+
task_success.connect(crons_task_success)
241+
task_failure.connect(crons_task_failure)
242+
task_retry.connect(crons_task_retry)
219243

220244

221245
def crons_task_success(sender, **kwargs):

sentry_sdk/scope.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -603,10 +603,9 @@ def iter_headers(self):
603603
def iter_trace_propagation_headers(self, *args, **kwargs):
604604
# type: (Any, Any) -> Generator[Tuple[str, str], None, None]
605605
"""
606-
Return HTTP headers which allow propagation of trace data.
607-
608-
If a span is given, the trace data will taken from the span.
609-
If no span is given, the trace data is taken from the scope.
606+
Return HTTP headers which allow propagation of trace data. Data taken
607+
from the span representing the request, if available, or the current
608+
span on the scope if not.
610609
"""
611610
client = Scope.get_client()
612611
if not client.options.get("propagate_traces"):

0 commit comments

Comments
 (0)