Skip to content

Commit 78fbb32

Browse files
committed
🚧 WIP telemetry
1 parent 14a7eb7 commit 78fbb32

File tree

6 files changed

+494
-0
lines changed

6 files changed

+494
-0
lines changed

flama/telemetry/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from flama.telemetry.data_structures import * # noqa
2+
from flama.telemetry.middleware import * # noqa

flama/telemetry/data_structures.py

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import dataclasses
2+
import datetime
3+
import logging
4+
import typing as t
5+
from http.cookies import SimpleCookie
6+
7+
from flama import Flama, types
8+
from flama.authentication.types import AccessToken, RefreshToken
9+
from flama.exceptions import HTTPException
10+
from flama.http import Request as HTTPRequest
11+
12+
logger = logging.getLogger(__name__)
13+
14+
__all__ = ["Endpoint", "Authentication", "Request", "Response", "Error", "TelemetryData"]
15+
16+
17+
@dataclasses.dataclass
18+
class Endpoint:
19+
path: str
20+
name: t.Optional[str]
21+
tags: dict[str, t.Any]
22+
23+
@classmethod
24+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Endpoint":
25+
app: Flama = scope["app"]
26+
27+
route, _ = app.router.resolve_route(scope)
28+
29+
return cls(path=str(route.path), name=route.name, tags=route.tags)
30+
31+
def to_dict(self) -> dict[str, t.Any]:
32+
return {"path": self.path, "name": self.name, "tags": self.tags}
33+
34+
35+
@dataclasses.dataclass
36+
class Authentication:
37+
access: t.Optional[AccessToken]
38+
refresh: t.Optional[RefreshToken]
39+
40+
@classmethod
41+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Authentication":
42+
app: Flama = scope["app"]
43+
context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)}
44+
45+
try:
46+
access = await app.injector.resolve(AccessToken).value(context)
47+
except Exception:
48+
access = None
49+
50+
try:
51+
refresh = await app.injector.resolve(RefreshToken).value(context)
52+
except Exception:
53+
refresh = None
54+
55+
return cls(access=access, refresh=refresh)
56+
57+
def to_dict(self) -> dict[str, t.Any]:
58+
return {"access": self.access, "refresh": self.refresh}
59+
60+
61+
@dataclasses.dataclass
62+
class Request:
63+
headers: dict[str, t.Any]
64+
cookies: dict[str, t.Any]
65+
query_parameters: dict[str, t.Any]
66+
path_parameters: dict[str, t.Any]
67+
body: bytes = b""
68+
timestamp: datetime.datetime = dataclasses.field(
69+
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
70+
)
71+
72+
@classmethod
73+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Request":
74+
app: Flama = scope["app"]
75+
context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)}
76+
77+
headers = dict(await app.injector.resolve(types.Headers).value(context))
78+
cookies = dict(await app.injector.resolve(types.Cookies).value(context))
79+
query = dict(await app.injector.resolve(types.QueryParams).value(context))
80+
path = dict(await app.injector.resolve(types.PathParams).value(context))
81+
82+
return cls(headers=headers, cookies=cookies, query_parameters=query, path_parameters=path)
83+
84+
def to_dict(self) -> dict[str, t.Any]:
85+
return {
86+
"timestamp": self.timestamp.isoformat(),
87+
"headers": self.headers,
88+
"cookies": self.headers,
89+
"query_parameters": self.query_parameters,
90+
"path_parameters": self.path_parameters,
91+
"body": self.body,
92+
}
93+
94+
95+
@dataclasses.dataclass
96+
class Response:
97+
headers: t.Optional[dict[str, t.Any]]
98+
cookies: t.Optional[dict[str, t.Any]] = dataclasses.field(init=False)
99+
body: bytes = b""
100+
status_code: t.Optional[int] = None
101+
timestamp: datetime.datetime = dataclasses.field(
102+
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
103+
)
104+
105+
def __post_init__(self):
106+
if self.headers:
107+
cookie = SimpleCookie()
108+
cookie.load(self.headers.get("cookie", ""))
109+
self.cookies = {
110+
str(name): {**{str(k): str(v) for k, v in morsel.items()}, "value": morsel.value}
111+
for name, morsel in cookie.items()
112+
}
113+
114+
def to_dict(self) -> dict[str, t.Any]:
115+
return {
116+
"timestamp": self.timestamp.isoformat(),
117+
"headers": self.headers,
118+
"cookies": self.headers,
119+
"body": self.body,
120+
"status_code": self.status_code,
121+
}
122+
123+
124+
@dataclasses.dataclass
125+
class Error:
126+
detail: str
127+
status_code: t.Optional[int] = None
128+
timestamp: datetime.datetime = dataclasses.field(
129+
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
130+
)
131+
132+
@classmethod
133+
async def from_exception(cls, *, exception: Exception) -> "Error":
134+
if isinstance(exception, HTTPException):
135+
return cls(status_code=exception.status_code, detail=str(exception.detail))
136+
137+
return cls(detail=str(exception))
138+
139+
def to_dict(self) -> dict[str, t.Any]:
140+
return {"timestamp": self.timestamp.isoformat(), "detail": self.detail, "status_code": self.status_code}
141+
142+
143+
@dataclasses.dataclass
144+
class TelemetryData:
145+
type: t.Literal["http", "websocket"]
146+
endpoint: Endpoint
147+
authentication: Authentication
148+
request: Request
149+
response: t.Optional[Response] = None
150+
error: t.Optional[Error] = None
151+
extra: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict)
152+
153+
@classmethod
154+
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "TelemetryData":
155+
return cls(
156+
type=scope["type"],
157+
endpoint=await Endpoint.from_scope(scope=scope, receive=receive, send=send),
158+
authentication=await Authentication.from_scope(scope=scope, receive=receive, send=send),
159+
request=await Request.from_scope(scope=scope, receive=receive, send=send),
160+
)
161+
162+
def to_dict(self) -> dict[str, t.Any]:
163+
return {
164+
"type": self.type,
165+
"endpoint": self.endpoint.to_dict(),
166+
"authentication": self.authentication.to_dict(),
167+
"request": self.request.to_dict(),
168+
"response": self.response.to_dict() if self.response else None,
169+
"error": self.error.to_dict() if self.error else None,
170+
"extra": self.extra,
171+
}

flama/telemetry/middleware.py

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import abc
2+
import logging
3+
import typing as t
4+
5+
from flama import Flama, concurrency, types
6+
from flama.telemetry.data_structures import Error, Response, TelemetryData
7+
8+
logger = logging.getLogger(__name__)
9+
10+
__all__ = ["TelemetryMiddleware"]
11+
12+
13+
PROJECT = "vortico-core"
14+
SERVICE = "elektrococo"
15+
TOPIC_ID = "telemetry-bus"
16+
17+
HookFunction = t.Callable[[TelemetryData], t.Union[None, t.Awaitable[None]]]
18+
19+
20+
class Wrapper(abc.ABC):
21+
def __init__(self, app: Flama, data: TelemetryData) -> None:
22+
self.app = app
23+
self.data = data
24+
25+
@classmethod
26+
def build(cls, type: t.Literal["http", "websocket"], app: Flama, data: TelemetryData) -> "Wrapper":
27+
if type == "websocket":
28+
return WebSocketWrapper(app, data)
29+
30+
return HTTPWrapper(app, data)
31+
32+
async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
33+
self._scope = scope
34+
self._receive = receive
35+
self._send = send
36+
self._response_body = b""
37+
self._response_headers = None
38+
self._response_status_code = None
39+
40+
try:
41+
await self.app(self._scope, self.receive, self.send)
42+
self.data.response = Response(headers=self._response_headers, status_code=self._response_status_code)
43+
except Exception as e:
44+
self.data.error = await Error.from_exception(exception=e)
45+
raise
46+
47+
@abc.abstractmethod
48+
async def receive(self) -> types.Message:
49+
...
50+
51+
@abc.abstractmethod
52+
async def send(self, message: types.Message) -> None:
53+
...
54+
55+
56+
class HTTPWrapper(Wrapper):
57+
async def receive(self) -> types.Message:
58+
message = await self._receive()
59+
60+
if message["type"] == "http.request":
61+
self.data.request.body += message.get("body", b"")
62+
63+
return message
64+
65+
async def send(self, message: types.Message) -> None:
66+
if message["type"] == "http.response.start":
67+
self._response_headers = message.get("headers", [])
68+
self._response_status_code = message.get("status")
69+
elif message["type"] == "http.response.body":
70+
self._response_body += message.get("body", b"")
71+
72+
await self._send(message)
73+
74+
75+
class WebSocketWrapper(Wrapper):
76+
async def receive(self) -> types.Message:
77+
message = await self._receive()
78+
79+
if message["type"] == "websocket.receive":
80+
self._response_body += message.get("body", b"")
81+
elif message["type"] == "websocket.disconnect":
82+
self._response_status_code = message.get("code", None)
83+
self._response_body = message.get("reason", "").encode()
84+
85+
return message
86+
87+
async def send(self, message: types.Message) -> None:
88+
if message["type"] == "websocket.send":
89+
self.data.request.body += message.get("bytes", message.get("text", "").encode())
90+
elif message["type"] == "websocket.close":
91+
self._response_status_code = message.get("code")
92+
self._response_body = message.get("reason", "").encode()
93+
94+
await self._send(message)
95+
96+
97+
class TelemetryDataCollector:
98+
data: TelemetryData
99+
100+
def __init__(self, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
101+
self.app = app
102+
self._scope = scope
103+
self._receive = receive
104+
self._send = send
105+
106+
@classmethod
107+
async def build(
108+
cls, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send
109+
) -> "TelemetryDataCollector":
110+
self = cls(app, scope, receive, send)
111+
self.data = await TelemetryData.from_scope(scope=scope, receive=receive, send=send)
112+
return self
113+
114+
async def __call__(self) -> None:
115+
await Wrapper.build(self._scope["type"], self.app, self.data)(
116+
scope=self._scope, receive=self._receive, send=self._send
117+
)
118+
119+
120+
class TelemetryMiddleware:
121+
def __init__(
122+
self,
123+
app: types.App,
124+
log_level: int = logging.INFO,
125+
*,
126+
before: t.Optional[HookFunction] = None,
127+
after: t.Optional[HookFunction] = None,
128+
) -> None:
129+
self.app: Flama = t.cast(Flama, app)
130+
self._log_level = log_level
131+
self._before = before
132+
self._after = after
133+
134+
async def before(self, data: TelemetryData):
135+
if self._before:
136+
await concurrency.run(self._before, data)
137+
138+
async def after(self, data: TelemetryData):
139+
if self._after:
140+
await concurrency.run(self._after, data)
141+
142+
async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
143+
if scope["type"] not in ("http", "websocket"):
144+
await self.app(scope, receive, send)
145+
return
146+
147+
collector = await TelemetryDataCollector.build(self.app, scope, receive, send)
148+
149+
await self.before(collector.data)
150+
151+
try:
152+
await collector()
153+
finally:
154+
await self.after(collector.data)
155+
logger.log(self._log_level, "Telemetry: %s", str(collector.data))

tests/telemetry/__init__.py

Whitespace-only changes.
+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import datetime
2+
from unittest.mock import MagicMock, patch
3+
4+
import pytest
5+
6+
from flama.telemetry.data_structures import Error, TelemetryData
7+
8+
9+
@pytest.fixture(scope="function", autouse=True)
10+
def add_routes(app):
11+
@app.route("/")
12+
def root():
13+
return {"puppy": "Canna"}
14+
15+
16+
@pytest.fixture(scope="function")
17+
def asgi_scope(app, asgi_scope):
18+
asgi_scope["app"] = app
19+
return asgi_scope
20+
21+
22+
class TestCaseAuthentication:
23+
def test_from_scope(self, asgi_scope, asgi_receive, asgi_send):
24+
...
25+
26+
27+
class TestCaseEndpoint:
28+
def test_from_scope(self, asgi_scope, asgi_receive, asgi_send):
29+
...
30+
31+
32+
class TestCaseRequest:
33+
def test_from_scope(self, asgi_scope, asgi_receive, asgi_send):
34+
...
35+
36+
37+
class TestCaseError:
38+
async def test_from_exception(self):
39+
now = datetime.datetime.now()
40+
with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))):
41+
try:
42+
raise ValueError("Foo")
43+
except ValueError as e:
44+
error = await Error.from_exception(exception=e)
45+
46+
assert error.to_dict() == {"detail": "Foo", "status_code": None, "timestamp": now.isoformat()}
47+
48+
49+
class TestCaseTelemetryData:
50+
async def test_from_scope(self, asgi_scope, asgi_receive, asgi_send):
51+
data = await TelemetryData.from_scope(scope=asgi_scope, receive=asgi_receive, send=asgi_send)
52+
53+
assert data.to_dict() == {}

0 commit comments

Comments
 (0)