|
39 | 39 | some cases customizably) time out.
|
40 | 40 |
|
41 | 41 | Every monitored query will have a SelectionKey associated with it,
|
42 |
| - and that key should be canceled by the monitor code once the query |
43 |
| - has been abandoned or has timed out. |
| 42 | + The key is cancelled during forget, but won't be removed from the |
| 43 | + selector's cancelled set until the next call to select. During that |
| 44 | + time, another query on the same socket/connection could try to |
| 45 | + re-register the cancelled key. This will throw an exception, which |
| 46 | + we suppress and retry until the select loop finally removes the |
| 47 | + cancelled key, and we can re-register the socket. |
44 | 48 |
|
45 | 49 | Every monitored query may also have a postgres pid associated with
|
46 | 50 | it, and whenever it does, that pid should be terminated (in
|
47 |
| - corrdination with the :terminated promise) once the query has been |
| 51 | + coordination with the :terminated promise) once the query has been |
48 | 52 | abandoned or has timed out.
|
49 | 53 |
|
50 | 54 | The terminated promise and :forget value coordinate between the
|
|
56 | 60 | The client socket monitoring depends on access to the jetty query
|
57 | 61 | response which (at least at the moment) provides indirect access to
|
58 | 62 | the java socket channel which can be read to determine whether the
|
59 |
| - client is still connected." |
| 63 | + client is still connected. |
| 64 | +
|
| 65 | + The current implementation is completely incompatible with http |
| 66 | + \"pipelining\", but it looks like that is no longer a realistic |
| 67 | + concern: |
| 68 | + https://daniel.haxx.se/blog/2019/04/06/curl-says-bye-bye-to-pipelining/ |
| 69 | +
|
| 70 | + If that turns out to be an incorrect assumption, then we'll have to |
| 71 | + reevaluate the implementation and/or feasibility of the monitoring. |
| 72 | + That's because so far, the only way we've found to detect a client |
| 73 | + disconnection is to attempt to read a byte. At the moment, that's |
| 74 | + acceptable because the client shouldn't be sending any data during |
| 75 | + the response (which of course wouldn't be true with pipelining, |
| 76 | + where it could be sending additional requests)." |
60 | 77 |
|
61 | 78 | (:require
|
62 | 79 | [clojure.tools.logging :as log]
|
|
331 | 348 | (.join thread))
|
332 | 349 | (not (.isAlive thread))))))
|
333 | 350 |
|
| 351 | +(defn- register-selector |
| 352 | + "Loops until the channel is registered with the selector while |
| 353 | + ignoring canceled key exceptions, which should only occur when a |
| 354 | + client is re-using the channel, and the previous query has called |
| 355 | + forget (and cancelled the key), and the main select loop hasn't |
| 356 | + removed it from the cancelled set yet. Returns the new |
| 357 | + SelectionKey." |
| 358 | + [channel selector ops] |
| 359 | + (or (try |
| 360 | + (.register ^SelectableChannel channel selector ops) |
| 361 | + (catch CancelledKeyException _)) |
| 362 | + (do |
| 363 | + ;; The competing forget should have already called wakeup. |
| 364 | + ;; REVIEW: 10? |
| 365 | + (Thread/sleep 10) |
| 366 | + (recur channel selector ops)))) |
| 367 | + |
334 | 368 | (defn stop-query-at-deadline-or-disconnect
|
335 | 369 | [{:keys [^Selector selector queries ^Thread thread] :as _monitor}
|
336 | 370 | id ^SelectableChannel channel deadline-ns db]
|
337 | 371 | (assert (.isAlive thread))
|
338 | 372 | (assert (instance? SelectableChannel channel))
|
339 |
| - (let [select-key (.register channel selector SelectionKey/OP_READ) |
| 373 | + (let [select-key (register-selector channel selector SelectionKey/OP_READ) |
340 | 374 | info {:query-id id
|
341 | 375 | :selection-key select-key
|
342 | 376 | :deadline-ns deadline-ns
|
|
355 | 389 | (-> prev
|
356 | 390 | (update :selector-keys assoc select-key info)
|
357 | 391 | (update :deadlines conj [[deadline-ns select-key] info]))))
|
358 |
| - (.wakeup selector) ;; so it can recompute deadline |
| 392 | + (.wakeup selector) ;; to recompute deadline and start watching select-key |
359 | 393 | select-key))
|
360 | 394 |
|
361 | 395 | (defn register-pg-pid
|
|
386 | 420 | [{:keys [queries ^Selector selector ^Thread thread] :as _monitor}
|
387 | 421 | ^SelectionKey select-key]
|
388 | 422 | (assert (.isAlive thread))
|
389 |
| - ;; After this some key methods will throw CancelledKeyException |
390 |
| - (.cancel select-key) |
391 | 423 | (let [maybe-await-termination (atom nil)]
|
392 | 424 | (swap! queries
|
393 | 425 | (fn [{:keys [selector-keys] :as state}]
|
|
399 | 431 | (update :selector-keys assoc select-key info)
|
400 | 432 | (update :deadlines assoc [deadline-ns select-key] info)))
|
401 | 433 | state)))
|
| 434 | + (.cancel select-key) |
| 435 | + (.wakeup selector) ;; clear out the cancelled keys (see stop-query-at-...) |
402 | 436 | (if (= ::timeout (some-> @maybe-await-termination (deref 2000 ::timeout)))
|
403 | 437 | ::timeout
|
404 | 438 | true)))
|
0 commit comments