Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix asyncpg breadcrumbs #3685

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 39 additions & 40 deletions sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations
import contextlib
from typing import Any, TypeVar, Callable, Awaitable, Iterator
from typing import Any, TypeVar, Callable, Awaitable, Iterator, Optional

import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
Expand All @@ -21,6 +21,7 @@
except ImportError:
raise DidNotEnable("asyncpg not installed.")


# asyncpg.__version__ is a string containing the semantic version in the form of "<major>.<minor>.<patch>"
asyncpg_version = parse_version(asyncpg.__version__)

Expand Down Expand Up @@ -123,10 +124,13 @@ def _wrap_connection_method(
async def _inner(*args: Any, **kwargs: Any) -> T:
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
return await f(*args, **kwargs)

query = args[1]
params_list = args[2] if len(args) > 2 else None

with _record(None, query, params_list, executemany=executemany) as span:
_set_db_data(span, args[0])
data = _get_db_data(conn=args[0])
_set_on_span(span, data)
res = await f(*args, **kwargs)

return res
Expand All @@ -146,7 +150,8 @@ def _inner(*args: Any, **kwargs: Any) -> T: # noqa: N807
params_list,
executemany=False,
) as span:
_set_db_data(span, args[0])
data = _get_db_data(conn=args[0])
_set_on_span(span, data)
res = f(*args, **kwargs)
span.set_attribute("db.cursor", _serialize_span_attribute(res))

Expand All @@ -160,41 +165,19 @@ async def _inner(*args: Any, **kwargs: Any) -> T:
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
return await f(*args, **kwargs)

user = kwargs["params"].user
database = kwargs["params"].database

with sentry_sdk.start_span(
op=OP.DB,
name="connect",
origin=AsyncPGIntegration.origin,
) as span:
span.set_attribute(SPANDATA.DB_SYSTEM, "postgresql")
addr = kwargs.get("addr")
if addr:
try:
span.set_attribute(SPANDATA.SERVER_ADDRESS, addr[0])
span.set_attribute(SPANDATA.SERVER_PORT, addr[1])
except IndexError:
pass

span.set_attribute(SPANDATA.DB_NAME, database)
span.set_attribute(SPANDATA.DB_USER, user)
data = _get_db_data(
addr=kwargs.get("addr"),
database=kwargs["params"].database,
user=kwargs["params"].user,
)
_set_on_span(span, data)

with capture_internal_exceptions():
data = {}
for attr in (
"db.cursor",
"db.params",
"db.paramstyle",
SPANDATA.DB_NAME,
SPANDATA.DB_SYSTEM,
SPANDATA.DB_USER,
SPANDATA.SERVER_ADDRESS,
SPANDATA.SERVER_PORT,
):
if span.get_attribute(attr):
data[attr] = span.get_attribute(attr)

sentry_sdk.add_breadcrumb(
message="connect", category="query", data=data
)
Expand All @@ -206,21 +189,37 @@ async def _inner(*args: Any, **kwargs: Any) -> T:
return _inner


def _set_db_data(span: Span, conn: Any) -> None:
span.set_attribute(SPANDATA.DB_SYSTEM, "postgresql")
def _get_db_data(
conn: Any = None,
addr: Optional[tuple[str]] = None,
database: Optional[str] = None,
user: Optional[str] = None,
) -> dict[str, str]:
if conn is not None:
addr = conn._addr
database = conn._params.database
user = conn._params.user

data = {
SPANDATA.DB_SYSTEM: "postgresql",
}

addr = conn._addr
if addr:
try:
span.set_attribute(SPANDATA.SERVER_ADDRESS, addr[0])
span.set_attribute(SPANDATA.SERVER_PORT, addr[1])
data[SPANDATA.SERVER_ADDRESS] = addr[0]
data[SPANDATA.SERVER_PORT] = addr[1]
except IndexError:
pass

database = conn._params.database
if database:
span.set_attribute(SPANDATA.DB_NAME, database)
data[SPANDATA.DB_NAME] = database

user = conn._params.user
if user:
span.set_attribute(SPANDATA.DB_USER, user)
data[SPANDATA.DB_USER] = user

return data


def _set_on_span(span: Span, data: dict[str, Any]):
for key, value in data.items():
span.set_attribute(key, value)
20 changes: 6 additions & 14 deletions tests/integrations/asyncpg/test_asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ async def _clean_pg():
async def test_connect(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand All @@ -97,7 +96,6 @@ async def test_connect(sentry_init, capture_events) -> None:
async def test_execute(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -163,7 +161,6 @@ async def test_execute(sentry_init, capture_events) -> None:
async def test_execute_many(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -202,7 +199,6 @@ async def test_execute_many(sentry_init, capture_events) -> None:
async def test_record_params(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration(record_params=True)],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -243,7 +239,6 @@ async def test_record_params(sentry_init, capture_events) -> None:
async def test_cursor(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -308,7 +303,6 @@ async def test_cursor(sentry_init, capture_events) -> None:
async def test_cursor_manual(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -375,7 +369,6 @@ async def test_cursor_manual(sentry_init, capture_events) -> None:
async def test_prepared_stmt(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -425,7 +418,6 @@ async def test_prepared_stmt(sentry_init, capture_events) -> None:
async def test_connection_pool(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={"record_sql_params": True},
)
events = capture_events()
Expand Down Expand Up @@ -497,7 +489,7 @@ async def test_connection_pool(sentry_init, capture_events) -> None:
async def test_query_source_disabled(sentry_init, capture_events):
sentry_options = {
"integrations": [AsyncPGIntegration()],
"enable_tracing": True,
"traces_sample_rate": 1.0,
"enable_db_query_source": False,
"db_query_source_threshold_ms": 0,
}
Expand Down Expand Up @@ -535,7 +527,7 @@ async def test_query_source_enabled(
):
sentry_options = {
"integrations": [AsyncPGIntegration()],
"enable_tracing": True,
"traces_sample_rate": 1.0,
"db_query_source_threshold_ms": 0,
}
if enable_db_query_source is not None:
Expand Down Expand Up @@ -571,7 +563,7 @@ async def test_query_source_enabled(
async def test_query_source(sentry_init, capture_events):
sentry_init(
integrations=[AsyncPGIntegration()],
enable_tracing=True,
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
)
Expand Down Expand Up @@ -621,7 +613,7 @@ async def test_query_source_with_module_in_search_path(sentry_init, capture_even
"""
sentry_init(
integrations=[AsyncPGIntegration()],
enable_tracing=True,
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
)
Expand Down Expand Up @@ -667,7 +659,7 @@ async def test_query_source_with_module_in_search_path(sentry_init, capture_even
async def test_no_query_source_if_duration_too_short(sentry_init, capture_events):
sentry_init(
integrations=[AsyncPGIntegration()],
enable_tracing=True,
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=100,
)
Expand Down Expand Up @@ -714,7 +706,7 @@ def fake_record_sql_queries(*args, **kwargs):
async def test_query_source_if_duration_over_threshold(sentry_init, capture_events):
sentry_init(
integrations=[AsyncPGIntegration()],
enable_tracing=True,
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=100,
)
Expand Down
Loading