Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 47a418b

Browse files
committedMar 4, 2025
✨ Module for gathering telemetry data (#169)
1 parent 0df0f60 commit 47a418b

File tree

7 files changed

+976
-9
lines changed

7 files changed

+976
-9
lines changed
 

‎flama/middleware.py

-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from flama.http import Request, Response
1717

1818
__all__ = [
19-
"AuthenticationMiddleware",
2019
"BaseHTTPMiddleware",
2120
"CORSMiddleware",
2221
"ExceptionMiddleware",
@@ -42,14 +41,6 @@ async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: "
4241
SessionMiddleware = None # type: ignore[assignment]
4342

4443

45-
class AuthenticationMiddleware(starlette.middleware.authentication.AuthenticationMiddleware):
46-
def __init__(self, app: "types.App", *args, **kwargs):
47-
super().__init__(app, *args, **kwargs) # type: ignore[arg-type]
48-
49-
async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: "types.Send") -> None: # type: ignore[overrid]
50-
return await super().__call__(scope, receive, send) # type: ignore[assignment]
51-
52-
5344
class BaseHTTPMiddleware(starlette.middleware.base.BaseHTTPMiddleware):
5445
def __init__(self, app: "types.App", *args, **kwargs):
5546
super().__init__(app, *args, **kwargs) # type: ignore[arg-type]

‎flama/telemetry/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from flama.telemetry.data_structures import * # noqa
2+
from flama.telemetry.middleware import * # noqa

‎flama/telemetry/data_structures.py

+184
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import dataclasses
2+
import datetime
3+
import logging
4+
import typing as t
5+
from http.cookies import SimpleCookie
6+
7+
from flama import Flama, types
8+
from flama.authentication.types import AccessToken, RefreshToken
9+
from flama.exceptions import HTTPException
10+
from flama.http import Request as HTTPRequest
11+
12+
logger = logging.getLogger(__name__)
13+
14+
__all__ = ["Endpoint", "Authentication", "Request", "Response", "Error", "TelemetryData"]
15+
16+
17+
@dataclasses.dataclass
18+
class Endpoint:
19+
path: str
20+
name: t.Optional[str]
21+
tags: dict[str, t.Any]
22+
23+
@classmethod
24+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Endpoint":
25+
app: Flama = scope["app"]
26+
route, _ = app.router.resolve_route(scope)
27+
28+
return cls(path=str(route.path), name=route.name, tags=route.tags)
29+
30+
def to_dict(self) -> dict[str, t.Any]:
31+
return {
32+
"path": self.path,
33+
"name": self.name,
34+
"tags": self.tags,
35+
}
36+
37+
38+
@dataclasses.dataclass
39+
class Authentication:
40+
access: t.Optional[AccessToken]
41+
refresh: t.Optional[RefreshToken]
42+
43+
@classmethod
44+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Authentication":
45+
app: Flama = scope["app"]
46+
context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)}
47+
48+
try:
49+
access = await app.injector.resolve(AccessToken).value(context)
50+
except Exception:
51+
access = None
52+
53+
try:
54+
refresh = await app.injector.resolve(RefreshToken).value(context)
55+
except Exception:
56+
refresh = None
57+
58+
return cls(access=access, refresh=refresh)
59+
60+
def to_dict(self) -> dict[str, t.Any]:
61+
return {
62+
"access": self.access.to_dict() if self.access else None,
63+
"refresh": self.refresh.to_dict() if self.refresh else None,
64+
}
65+
66+
67+
@dataclasses.dataclass
68+
class Request:
69+
headers: dict[str, t.Any]
70+
cookies: dict[str, t.Any]
71+
query_parameters: dict[str, t.Any]
72+
path_parameters: dict[str, t.Any]
73+
body: bytes = b""
74+
timestamp: datetime.datetime = dataclasses.field(
75+
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
76+
)
77+
78+
@classmethod
79+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Request":
80+
app: Flama = scope["app"]
81+
context = {"scope": scope, "request": HTTPRequest(scope, receive=receive), "route": app.resolve_route(scope)[0]}
82+
83+
headers = dict(await app.injector.resolve(types.Headers).value(context))
84+
cookies = dict(await app.injector.resolve(types.Cookies).value(context))
85+
query = dict(await app.injector.resolve(types.QueryParams).value(context))
86+
path = dict(await app.injector.resolve(types.PathParams).value(context))
87+
88+
return cls(headers=headers, cookies=cookies, query_parameters=query, path_parameters=path)
89+
90+
def to_dict(self) -> dict[str, t.Any]:
91+
return {
92+
"timestamp": self.timestamp.isoformat(),
93+
"headers": self.headers,
94+
"cookies": self.cookies,
95+
"query_parameters": self.query_parameters,
96+
"path_parameters": self.path_parameters,
97+
"body": self.body,
98+
}
99+
100+
101+
@dataclasses.dataclass
102+
class Response:
103+
headers: t.Optional[dict[str, t.Any]]
104+
cookies: t.Optional[dict[str, t.Any]] = dataclasses.field(init=False)
105+
body: bytes = b""
106+
status_code: t.Optional[int] = None
107+
timestamp: datetime.datetime = dataclasses.field(
108+
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
109+
)
110+
111+
def __post_init__(self):
112+
if self.headers:
113+
cookie = SimpleCookie()
114+
cookie.load(self.headers.get("cookie", ""))
115+
else:
116+
cookie = {}
117+
118+
self.cookies = {
119+
str(name): {**{str(k): str(v) for k, v in morsel.items()}, "value": morsel.value}
120+
for name, morsel in cookie.items()
121+
}
122+
123+
def to_dict(self) -> dict[str, t.Any]:
124+
return {
125+
"timestamp": self.timestamp.isoformat(),
126+
"headers": self.headers,
127+
"cookies": self.cookies,
128+
"body": self.body,
129+
"status_code": self.status_code,
130+
}
131+
132+
133+
@dataclasses.dataclass
134+
class Error:
135+
detail: str
136+
status_code: t.Optional[int] = None
137+
timestamp: datetime.datetime = dataclasses.field(
138+
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
139+
)
140+
141+
@classmethod
142+
async def from_exception(cls, *, exception: Exception) -> "Error":
143+
if isinstance(exception, HTTPException):
144+
return cls(status_code=exception.status_code, detail=str(exception.detail))
145+
146+
return cls(detail=str(exception))
147+
148+
def to_dict(self) -> dict[str, t.Any]:
149+
return {
150+
"timestamp": self.timestamp.isoformat(),
151+
"detail": self.detail,
152+
"status_code": self.status_code,
153+
}
154+
155+
156+
@dataclasses.dataclass
157+
class TelemetryData:
158+
type: t.Literal["http", "websocket"]
159+
endpoint: Endpoint
160+
authentication: Authentication
161+
request: Request
162+
response: t.Optional[Response] = None
163+
error: t.Optional[Error] = None
164+
extra: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict)
165+
166+
@classmethod
167+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "TelemetryData":
168+
return cls(
169+
type=scope["type"],
170+
endpoint=await Endpoint.from_scope(scope=scope, receive=receive, send=send),
171+
authentication=await Authentication.from_scope(scope=scope, receive=receive, send=send),
172+
request=await Request.from_scope(scope=scope, receive=receive, send=send),
173+
)
174+
175+
def to_dict(self) -> dict[str, t.Any]:
176+
return {
177+
"type": self.type,
178+
"endpoint": self.endpoint.to_dict(),
179+
"authentication": self.authentication.to_dict(),
180+
"request": self.request.to_dict(),
181+
"response": self.response.to_dict() if self.response else None,
182+
"error": self.error.to_dict() if self.error else None,
183+
"extra": self.extra,
184+
}

‎flama/telemetry/middleware.py

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import abc
2+
import logging
3+
import typing as t
4+
5+
from flama import Flama, concurrency, types
6+
from flama.telemetry.data_structures import Error, Response, TelemetryData
7+
8+
logger = logging.getLogger(__name__)
9+
10+
__all__ = ["TelemetryMiddleware"]
11+
12+
13+
PROJECT = "vortico-core"
14+
SERVICE = "elektrococo"
15+
TOPIC_ID = "telemetry-bus"
16+
17+
HookFunction = t.Callable[[TelemetryData], t.Union[None, t.Awaitable[None]]]
18+
19+
20+
class Wrapper(abc.ABC):
21+
def __init__(self, app: Flama, data: TelemetryData) -> None:
22+
self.app = app
23+
self.data = data
24+
25+
@classmethod
26+
def build(cls, type: t.Literal["http", "websocket"], app: Flama, data: TelemetryData) -> "Wrapper":
27+
if type == "websocket":
28+
return WebSocketWrapper(app, data)
29+
30+
return HTTPWrapper(app, data)
31+
32+
async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
33+
self._scope = scope
34+
self._receive = receive
35+
self._send = send
36+
self._response_body = b""
37+
self._response_headers = None
38+
self._response_status_code = None
39+
40+
try:
41+
await self.app(self._scope, self.receive, self.send)
42+
self.data.response = Response(
43+
headers=self._response_headers, body=self._response_body, status_code=self._response_status_code
44+
)
45+
except Exception as e:
46+
self.data.error = await Error.from_exception(exception=e)
47+
raise
48+
49+
@abc.abstractmethod
50+
async def receive(self) -> types.Message: ...
51+
52+
@abc.abstractmethod
53+
async def send(self, message: types.Message) -> None: ...
54+
55+
56+
class HTTPWrapper(Wrapper):
57+
async def receive(self) -> types.Message:
58+
message = await self._receive()
59+
60+
if message["type"] == "http.request":
61+
self.data.request.body += message.get("body", b"")
62+
63+
return message
64+
65+
async def send(self, message: types.Message) -> None:
66+
if message["type"] == "http.response.start":
67+
self._response_headers = {k.decode(): v.decode() for (k, v) in message.get("headers", [])}
68+
self._response_status_code = message.get("status")
69+
elif message["type"] == "http.response.body":
70+
self._response_body += message.get("body", b"")
71+
72+
await self._send(message)
73+
74+
75+
class WebSocketWrapper(Wrapper):
76+
async def receive(self) -> types.Message:
77+
message = await self._receive()
78+
79+
if message["type"] == "websocket.receive":
80+
self._response_body += message.get("body", b"")
81+
elif message["type"] == "websocket.disconnect":
82+
self._response_status_code = message.get("code", None)
83+
self._response_body = message.get("reason", "").encode()
84+
85+
return message
86+
87+
async def send(self, message: types.Message) -> None:
88+
if message["type"] == "websocket.send":
89+
self.data.request.body += message.get("bytes", message.get("text", "").encode())
90+
elif message["type"] == "websocket.close":
91+
self._response_status_code = message.get("code")
92+
self._response_body = message.get("reason", "").encode()
93+
94+
await self._send(message)
95+
96+
97+
class TelemetryDataCollector:
98+
data: TelemetryData
99+
100+
def __init__(self, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
101+
self.app = app
102+
self._scope = scope
103+
self._receive = receive
104+
self._send = send
105+
106+
@classmethod
107+
async def build(
108+
cls, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send
109+
) -> "TelemetryDataCollector":
110+
self = cls(app, scope, receive, send)
111+
self.data = await TelemetryData.from_scope(scope=scope, receive=receive, send=send)
112+
return self
113+
114+
async def __call__(self) -> None:
115+
await Wrapper.build(self._scope["type"], self.app, self.data)(
116+
scope=self._scope, receive=self._receive, send=self._send
117+
)
118+
119+
120+
class TelemetryMiddleware:
121+
def __init__(
122+
self,
123+
app: types.App,
124+
log_level: int = logging.NOTSET,
125+
*,
126+
before: t.Optional[HookFunction] = None,
127+
after: t.Optional[HookFunction] = None,
128+
) -> None:
129+
self.app: Flama = t.cast(Flama, app)
130+
self._log_level = log_level
131+
self._before = before
132+
self._after = after
133+
134+
async def before(self, data: TelemetryData):
135+
if self._before:
136+
await concurrency.run(self._before, data)
137+
138+
async def after(self, data: TelemetryData):
139+
if self._after:
140+
await concurrency.run(self._after, data)
141+
142+
async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
143+
if scope["type"] not in ("http", "websocket"):
144+
await self.app(scope, receive, send)
145+
return
146+
147+
collector = await TelemetryDataCollector.build(self.app, scope, receive, send)
148+
149+
await self.before(collector.data)
150+
151+
try:
152+
await collector()
153+
finally:
154+
await self.after(collector.data)
155+
logger.log(self._log_level, "Telemetry: %s", str(collector.data))

‎tests/telemetry/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)
Please sign in to comment.