Skip to content

Commit 4088650

Browse files
committed
Add Telemetry/OpenTelemetry support
1 parent f3ff91f commit 4088650

File tree

8 files changed

+357
-52
lines changed

8 files changed

+357
-52
lines changed

lib/upload.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ defmodule Upload do
153153
transform_fn,
154154
opts
155155
)
156-
|> repo.transaction()
156+
|> repo.transaction(Keyword.get(opts, :transaction_opts))
157157
|> case do
158158
{:ok, multi_result} ->
159159
{:ok, Map.values(multi_result)}

lib/upload/logger.ex

-8
This file was deleted.

lib/upload/multi.ex

+63-25
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ defmodule Upload.Multi do
77
alias Ecto.Association.NotLoaded
88
alias Ecto.Multi
99
alias Upload.Blob
10-
alias Upload.Logger
1110
alias Upload.Storage
1211

1312
@doc """
@@ -37,11 +36,19 @@ defmodule Upload.Multi do
3736
defp do_upload_blob(%Blob{path: nil} = blob, _opts), do: {:ok, blob}
3837

3938
defp do_upload_blob(%Blob{path: path, key: key} = blob, opts) when is_binary(key) do
40-
Upload.Logger.info("Uploading #{key}")
41-
42-
with :ok <- Storage.upload(path, key),
43-
:ok <- Upload.put_access_control_list(blob, opts[:canned_acl] || :private),
44-
do: {:ok, blob}
39+
metadata = %{key: key, path: path}
40+
41+
:telemetry.span(
42+
[:upload, :storage_upload],
43+
metadata,
44+
fn ->
45+
{with(
46+
:ok <- Storage.upload(path, key),
47+
:ok <- Upload.put_access_control_list(blob, opts[:canned_acl] || :private),
48+
do: {:ok, blob}
49+
), metadata}
50+
end
51+
)
4552
end
4653

4754
@doc """
@@ -219,20 +226,28 @@ defmodule Upload.Multi do
219226
defp do_delete(nil), do: {:ok, nil}
220227

221228
defp do_delete(%Blob{key: key} = blob) when is_binary(key) do
222-
Logger.info("Removing blob #{blob.key}")
223-
224229
with :ok <- remove_variants(blob),
225-
:ok <- Storage.delete(key),
230+
:ok <- storage_delete_with_telemetry(key),
226231
do: {:ok, blob}
227232
end
228233

234+
defp storage_delete_with_telemetry(key) do
235+
metadata = %{key: key}
236+
237+
:telemetry.span(
238+
[:upload, :storage_delete],
239+
metadata,
240+
fn ->
241+
{Storage.delete(key), metadata}
242+
end
243+
)
244+
end
245+
229246
defp remove_variants(blob) do
230247
repo = Upload.Config.repo()
231248
blob = repo.preload(blob, :variants)
232249

233250
Enum.each(blob.variants, fn variant ->
234-
Logger.info("Removing variant #{variant.variant} for key #{blob.key}")
235-
236251
{:ok, _blob_variant} = do_delete(variant)
237252
end)
238253

@@ -373,7 +388,7 @@ defmodule Upload.Multi do
373388
with {:ok, blob_path} <- create_random_file(),
374389
:ok <- download_file(original_blob.key, blob_path),
375390
{:ok, variant_path} <-
376-
call_transform_fn(transform_fn, blob_path, variant, format),
391+
call_transform_fn(original_blob.key, transform_fn, blob_path, variant, format),
377392
:ok <- cleanup(blob_path),
378393
{:ok, blob} <- insert_variant(repo, original_blob, variant, variant_path),
379394
{:ok, _} <- do_upload_blob(blob, opts),
@@ -385,19 +400,28 @@ defmodule Upload.Multi do
385400
end)
386401
end
387402

388-
defp call_transform_fn(transform_fn, blob_path, variant, format) do
389-
case :timer.tc(transform_fn, [blob_path, variant, format]) do
390-
{microseconds, {:ok, path}} ->
391-
Upload.Logger.info(
392-
"Processed image variant '#{variant}' with format '#{format}' in #{microseconds / 1_000}ms"
393-
)
394-
403+
defp call_transform_fn(original_blob_key, transform_fn, blob_path, variant, format) do
404+
metadata = %{
405+
original_blob_key: original_blob_key,
406+
blob_path: blob_path,
407+
variant: variant,
408+
format: format
409+
}
410+
411+
case :telemetry.span(
412+
[:upload, :transform],
413+
metadata,
414+
fn ->
415+
{transform_fn.(blob_path, variant, format), metadata}
416+
end
417+
) do
418+
{:ok, path} ->
395419
{:ok, path}
396420

397-
{_time, {:error, error}} ->
421+
{:error, error} ->
398422
{:error, error}
399423

400-
{_time, unexpected} ->
424+
unexpected ->
401425
raise "Expected upload transform function to return {:ok, path} or {:error, error}, got: #{inspect(unexpected)}"
402426
end
403427
end
@@ -448,9 +472,23 @@ defmodule Upload.Multi do
448472
end
449473

450474
defp download_file(key, path) do
451-
case Upload.Storage.download(key, path) do
452-
:ok -> :ok
453-
{:error, reason} -> {:error, %Upload.DownloadError{reason: reason, key: key, path: path}}
454-
end
475+
metadata = %{key: key, path: path}
476+
477+
:telemetry.span(
478+
[:upload, :storage_download],
479+
metadata,
480+
fn ->
481+
result =
482+
case Upload.Storage.download(key, path) do
483+
:ok ->
484+
:ok
485+
486+
{:error, reason} ->
487+
{:error, %Upload.DownloadError{reason: reason, key: key, path: path}}
488+
end
489+
490+
{result, metadata}
491+
end
492+
)
455493
end
456494
end

lib/upload/opentelemetry.ex

+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
defmodule Upload.OpenTelemetry do
2+
require Logger
3+
4+
@tracer_id __MODULE__
5+
6+
def attach do
7+
:telemetry.attach_many(
8+
"upload-opentelemetry",
9+
[
10+
[:upload, :transform, :start],
11+
[:upload, :transform, :stop],
12+
[:upload, :transform, :exception],
13+
[:upload, :storage_upload, :start],
14+
[:upload, :storage_upload, :stop],
15+
[:upload, :storage_upload, :exception],
16+
[:upload, :storage_download, :start],
17+
[:upload, :storage_download, :stop],
18+
[:upload, :storage_download, :exception],
19+
[:upload, :analyze, :start],
20+
[:upload, :analyze, :stop],
21+
[:upload, :analyze, :exception],
22+
[:upload, :stat, :start],
23+
[:upload, :stat, :stop],
24+
[:upload, :stat, :exception]
25+
],
26+
&__MODULE__.handle_event/4,
27+
%{}
28+
)
29+
end
30+
31+
def handle_event(
32+
[_, _, :start] = event,
33+
_measurements,
34+
metadata,
35+
%{}
36+
) do
37+
OpentelemetryTelemetry.start_telemetry_span(
38+
@tracer_id,
39+
span_name(event),
40+
metadata,
41+
%{}
42+
)
43+
44+
add_start_attributes(event, metadata)
45+
46+
:ok
47+
end
48+
49+
def handle_event(
50+
[_, _, :stop] = event,
51+
_measurements,
52+
metadata,
53+
%{}
54+
) do
55+
add_stop_attributes(event, metadata)
56+
57+
OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
58+
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
59+
60+
:ok
61+
end
62+
63+
def handle_event(
64+
[_, _, :exception],
65+
%{duration: _duration},
66+
%{kind: kind, reason: reason, stacktrace: stacktrace} = metadata,
67+
_
68+
) do
69+
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
70+
status = OpenTelemetry.status(:error, to_string(reason))
71+
exception = Exception.normalize(kind, reason, stacktrace)
72+
73+
OpenTelemetry.Span.record_exception(ctx, exception, stacktrace, [])
74+
OpenTelemetry.Tracer.set_status(status)
75+
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
76+
:ok
77+
end
78+
79+
defp span_name([:upload, :transform, :start]), do: "Upload.transform"
80+
defp span_name([:upload, :storage_upload, :start]), do: "Upload.storage_upload"
81+
defp span_name([:upload, :storage_download, :start]), do: "Upload.storage_download"
82+
defp span_name([:upload, :storage_delete, :start]), do: "Upload.storage_delete"
83+
defp span_name([:upload, :analyze, :start]), do: "Upload.analyze"
84+
defp span_name([:upload, :stat, :start]), do: "Upload.stat"
85+
86+
defp add_start_attributes([:upload, :transform, :start], %{
87+
original_blob_key: original_blob_key,
88+
blob_path: blob_path,
89+
variant: variant,
90+
format: format
91+
}) do
92+
OpenTelemetry.Span.set_attributes(OpenTelemetry.Tracer.current_span_ctx(),
93+
"upload.transform.original_blob_key": original_blob_key,
94+
"upload.transform.blob_path": blob_path,
95+
"upload.transform.variant": variant,
96+
"upload.transform.format": format
97+
)
98+
end
99+
100+
defp add_start_attributes([:upload, :storage_upload, :start], %{key: key, path: path}) do
101+
OpenTelemetry.Span.set_attributes(OpenTelemetry.Tracer.current_span_ctx(),
102+
"upload.storage_upload.key": key,
103+
"upload.storage_upload.path": path
104+
)
105+
end
106+
107+
defp add_start_attributes([:upload, :storage_download, :start], %{key: key, path: path}) do
108+
OpenTelemetry.Span.set_attributes(OpenTelemetry.Tracer.current_span_ctx(),
109+
"upload.storage_download.key": key,
110+
"upload.storage_download.path": path
111+
)
112+
end
113+
114+
defp add_start_attributes([:upload, :storage_delete, :start], %{key: key}) do
115+
OpenTelemetry.Span.set_attributes(OpenTelemetry.Tracer.current_span_ctx(),
116+
"upload.storage_delete.key": key
117+
)
118+
end
119+
120+
defp add_start_attributes([:upload, :analyze, :start], %{
121+
analyzer: analyzer,
122+
path: path,
123+
content_type: content_type
124+
}) do
125+
OpenTelemetry.Span.set_attributes(OpenTelemetry.Tracer.current_span_ctx(),
126+
"upload.analyze.analyzer": analyzer,
127+
"upload.analyze.path": path,
128+
"upload.analyze.content_type": content_type
129+
)
130+
end
131+
132+
defp add_start_attributes(_, _), do: :ok
133+
134+
defp add_stop_attributes([:upload, :stat, :stop], %{
135+
stat: %{
136+
path: path,
137+
filename: filename,
138+
checksum: checksum,
139+
byte_size: byte_size,
140+
content_type: content_type,
141+
metadata: metadata
142+
}
143+
}) do
144+
OpenTelemetry.Span.set_attributes(OpenTelemetry.Tracer.current_span_ctx(),
145+
"upload.stat.path": path,
146+
"upload.stat.filename": filename,
147+
"upload.stat.checksum": checksum,
148+
"upload.stat.byte_size": byte_size,
149+
"upload.stat.content_type": content_type
150+
)
151+
152+
OpenTelemetry.Span.set_attributes(
153+
OpenTelemetry.Tracer.current_span_ctx(),
154+
Enum.map(metadata, fn {key, value} ->
155+
{"upload.stat.metadata." <> to_string(key), to_string(value)}
156+
end)
157+
)
158+
end
159+
160+
defp add_stop_attributes(_, _), do: :ok
161+
end

lib/upload/stat.ex

+40-18
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,33 @@ defmodule Upload.Stat do
2222
@doc false
2323
@spec stat(Path.t()) :: {:ok, t()} | {:error, error()}
2424
def stat(path) do
25-
with {:ok, byte_size} <- get_byte_size(path),
26-
{:ok, checksum} <- compute_checksum(path),
27-
{:ok, detected_type} <- detect_type(path),
28-
{:ok, metadata} <- analyze(path, detected_type) do
29-
content_type = get_content_type(path, detected_type)
30-
31-
stat = %__MODULE__{
32-
path: path,
33-
byte_size: byte_size,
34-
checksum: checksum,
35-
metadata: metadata,
36-
content_type: content_type,
37-
filename: Path.basename(path)
38-
}
39-
40-
{:ok, stat}
41-
end
25+
metadata = %{path: path}
26+
27+
:telemetry.span(
28+
[:upload, :stat],
29+
metadata,
30+
fn ->
31+
with {:ok, byte_size} <- get_byte_size(path),
32+
{:ok, checksum} <- compute_checksum(path),
33+
{:ok, detected_type} <- detect_type(path),
34+
{:ok, metadata} <- analyze(path, detected_type) do
35+
content_type = get_content_type(path, detected_type)
36+
37+
stat = %__MODULE__{
38+
path: path,
39+
byte_size: byte_size,
40+
checksum: checksum,
41+
metadata: metadata,
42+
content_type: content_type,
43+
filename: Path.basename(path)
44+
}
45+
46+
{{:ok, stat}, Map.put(metadata, :stat, stat)}
47+
else
48+
{:error, error} -> {{:error, error}, metadata}
49+
end
50+
end
51+
)
4252
end
4353

4454
@doc false
@@ -97,11 +107,23 @@ defmodule Upload.Stat do
97107
analyzers = Upload.Config.analyzers()
98108

99109
Enum.find_value(analyzers, {:ok, nil}, fn analyzer ->
100-
case analyzer.stat(path, content_type) do
110+
case run_analyzer_with_telemetry(analyzer, path, content_type) do
101111
{:ok, nil} -> nil
102112
{:ok, metadata} -> {:ok, metadata}
103113
{:error, reason} when is_struct(reason) -> {:error, reason}
104114
end
105115
end)
106116
end
117+
118+
defp run_analyzer_with_telemetry(analyzer, path, content_type) do
119+
metadata = %{analyzer: analyzer, path: path, content_type: content_type}
120+
121+
:telemetry.span(
122+
[:upload, :analyze],
123+
metadata,
124+
fn ->
125+
{analyzer.stat(path, content_type), metadata}
126+
end
127+
)
128+
end
107129
end

0 commit comments

Comments
 (0)