Skip to content

Commit 240db46

Browse files
author
Michael Ruoss
committed
Monitor calling processes and clean up state upon :DOWN
1 parent 75469db commit 240db46

File tree

5 files changed

+76
-14
lines changed

5 files changed

+76
-14
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## Unreleased
9+
10+
### Added
11+
12+
- `K8s.Client.Mint.HTTPAdapter` - Monitor caller and cleanup state upon `:DOWN`.
13+
814
## [2.0.0-rc.3] - 2023-01-01
915

1016
### Fixed

lib/k8s/client/mint/http_adapter.ex

+59-11
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,15 @@ defmodule K8s.Client.Mint.HTTPAdapter do
240240

241241
@impl true
242242
def handle_call({:request, method, path, headers, body}, from, state) do
243-
make_request(state, method, path, headers, body, type: :sync, caller: from)
243+
make_request(state, method, path, headers, body, from, type: :sync, caller: from)
244244
end
245245

246-
def handle_call({:stream, method, path, headers, body}, _from, state) do
247-
make_request(state, method, path, headers, body, type: :stream)
246+
def handle_call({:stream, method, path, headers, body}, from, state) do
247+
make_request(state, method, path, headers, body, from, type: :stream)
248248
end
249249

250-
def handle_call({:stream_to, method, path, headers, body, stream_to}, _from, state) do
251-
make_request(state, method, path, headers, body, type: :stream_to, stream_to: stream_to)
250+
def handle_call({:stream_to, method, path, headers, body, stream_to}, from, state) do
251+
make_request(state, method, path, headers, body, from, type: :stream_to, stream_to: stream_to)
252252
end
253253

254254
def handle_call({:websocket_request, path, headers}, from, state) do
@@ -326,13 +326,39 @@ defmodule K8s.Client.Mint.HTTPAdapter do
326326
end
327327
end
328328

329+
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
330+
state =
331+
state.requests
332+
|> Map.filter(fn {_request_ref, request} -> request.caller_ref == ref end)
333+
|> Map.keys()
334+
|> Enum.reduce_while(state, fn
335+
request_ref, state ->
336+
case pop_in(state.requests[request_ref]) do
337+
{%HTTPRequest{}, state} ->
338+
conn = Mint.HTTP2.cancel_request(state.conn, request_ref) |> elem(1)
339+
{:cont, struct!(state, conn: conn)}
340+
341+
{_, state} ->
342+
{:halt, {:stop, state}}
343+
end
344+
end)
345+
346+
case state do
347+
{:stop, state} ->
348+
{:stop, :normal, state}
349+
350+
state ->
351+
{:noreply, state}
352+
end
353+
end
354+
329355
@impl true
330356
def terminate(_reason, state) do
331357
state = flush_buffer(state)
332358

333359
state
334360
|> Map.get(:requests)
335-
|> Enum.filter(fn {_ref, request} -> is_map_key(request, :websocket) end)
361+
|> Enum.filter(fn {_ref, request} -> is_struct(request, WebSocketRequest) end)
336362
|> Enum.each(fn {request_ref, request} ->
337363
{:ok, _websocket, data} = Mint.WebSocket.encode(request.websocket, :close)
338364
Mint.WebSocket.stream_request_body(state.conn, request_ref, data)
@@ -342,15 +368,25 @@ defmodule K8s.Client.Mint.HTTPAdapter do
342368
:ok
343369
end
344370

345-
@spec make_request(t(), binary(), binary(), Mint.Types.headers(), binary(), keyword()) ::
371+
@spec make_request(
372+
t(),
373+
binary(),
374+
binary(),
375+
Mint.Types.headers(),
376+
binary(),
377+
GenServer.from(),
378+
keyword()
379+
) ::
346380
{:noreply, t()} | {:reply, :ok | {:ok, reference()} | {:error, HTTPError.t()}, t()}
347-
defp make_request(state, method, path, headers, body, extra) do
381+
defp make_request(state, method, path, headers, body, caller, extra) do
382+
caller_ref = caller |> elem(0) |> Process.monitor()
383+
348384
case Mint.HTTP.request(state.conn, method, path, headers, body) do
349385
{:ok, conn, request_ref} ->
350386
state =
351387
put_in(
352388
state.requests[request_ref],
353-
HTTPRequest.new(extra)
389+
extra |> Keyword.put(:caller_ref, caller_ref) |> HTTPRequest.new()
354390
)
355391

356392
case extra[:type] do
@@ -366,15 +402,27 @@ defmodule K8s.Client.Mint.HTTPAdapter do
366402
end
367403
end
368404

369-
@spec upgrade_to_websocket(t(), binary(), Mint.Types.headers(), pid(), WebSocketRequest.t()) ::
405+
@spec upgrade_to_websocket(
406+
t(),
407+
binary(),
408+
Mint.Types.headers(),
409+
GenServer.from(),
410+
WebSocketRequest.t()
411+
) ::
370412
{:noreply, t()} | {:reply, {:error, HTTPError.t(), t()}}
371413
defp upgrade_to_websocket(state, path, headers, caller, websocket_request) do
414+
caller_ref = caller |> elem(0) |> Process.monitor()
415+
372416
case Mint.WebSocket.upgrade(:wss, state.conn, path, headers) do
373417
{:ok, conn, request_ref} ->
374418
state =
375419
put_in(
376420
state.requests[request_ref],
377-
UpgradeRequest.new(caller: caller, websocket_request: websocket_request)
421+
UpgradeRequest.new(
422+
caller: caller,
423+
caller_ref: caller_ref,
424+
websocket_request: struct!(websocket_request, caller_ref: caller_ref)
425+
)
378426
)
379427

380428
{:noreply, struct!(state, conn: conn)}

lib/k8s/client/mint/request/http.ex

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ defmodule K8s.Client.Mint.Request.HTTP do
1818

1919
@type t :: %__MODULE__{
2020
caller: pid() | nil,
21+
caller_ref: reference(),
2122
stream_to: pid() | nil,
2223
waiting: pid() | nil,
2324
response: %{},
@@ -26,7 +27,7 @@ defmodule K8s.Client.Mint.Request.HTTP do
2627

2728
@type request :: t() | WebSocketRequest.t() | UpgradeRequest.t()
2829

29-
defstruct [:caller, :stream_to, :waiting, :type, response: %{}]
30+
defstruct [:caller, :caller_ref, :stream_to, :waiting, :type, response: %{}]
3031

3132
@spec new(keyword()) :: t()
3233
def new(fields), do: struct!(__MODULE__, fields)
@@ -97,6 +98,11 @@ defmodule K8s.Client.Mint.Request.HTTP do
9798
def map_response({:done, ref}), do: {:done, ref}
9899
def map_response({type, ref, value}), do: {{type, value}, ref}
99100

101+
@doc """
102+
If there are any parts in the response, they are sent to the process
103+
registered in the `:waiting` field of that request. The response is cleared
104+
thereafter.
105+
"""
100106
@spec flush_buffer(request()) :: request()
101107
def flush_buffer(%{waiting: waiting, response: response} = request)
102108
when not is_nil(waiting) and response != %{} do

lib/k8s/client/mint/request/upgrade.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ defmodule K8s.Client.Mint.Request.Upgrade do
1212

1313
@type t :: %__MODULE__{
1414
caller: pid() | nil,
15+
caller_ref: reference(),
1516
websocket_request: WebSocketRequest.t(),
1617
response: %{},
1718
type: :sync
1819
}
1920

20-
defstruct [:caller, :websocket_request, response: %{}, type: :sync]
21+
defstruct [:caller, :caller_ref, :websocket_request, response: %{}, type: :sync]
2122

2223
@spec new(keyword()) :: t()
2324
def new(fields), do: struct!(__MODULE__, fields)

lib/k8s/client/mint/request/web_socket.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ defmodule K8s.Client.Mint.Request.WebSocket do
1515

1616
@type t :: %__MODULE__{
1717
caller: pid() | nil,
18+
caller_ref: reference(),
1819
stream_to: pid() | nil,
1920
waiting: pid() | nil,
2021
websocket: Mint.WebSocket.t() | nil,
2122
response: %{},
2223
type: HTTPRequest.request_types()
2324
}
2425

25-
defstruct [:caller, :stream_to, :websocket, :waiting, :type, response: %{}]
26+
defstruct [:caller, :caller_ref, :stream_to, :websocket, :waiting, :type, response: %{}]
2627

2728
@spec new(keyword()) :: t()
2829
def new(fields \\ []), do: struct!(__MODULE__, fields)

0 commit comments

Comments
 (0)