@@ -65,7 +65,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
65
65
private final ProxyConfiguration .Proxy proxy ;
66
66
private final ClientConnectionFactory connectionFactory ;
67
67
private final HttpField hostField ;
68
- private final TimeoutTask timeout ;
68
+ private final RequestTimeouts requestTimeouts ;
69
69
private ConnectionPool connectionPool ;
70
70
71
71
public HttpDestination (HttpClient client , Origin origin )
@@ -78,7 +78,7 @@ public HttpDestination(HttpClient client, Origin origin)
78
78
this .requestNotifier = new RequestNotifier (client );
79
79
this .responseNotifier = new ResponseNotifier ();
80
80
81
- this .timeout = new TimeoutTask (client .getScheduler ());
81
+ this .requestTimeouts = new RequestTimeouts (client .getScheduler ());
82
82
83
83
ProxyConfiguration proxyConfig = client .getProxyConfiguration ();
84
84
proxy = proxyConfig .match (origin );
@@ -272,7 +272,7 @@ public void send(HttpExchange exchange)
272
272
{
273
273
long expiresAt = request .getTimeoutAt ();
274
274
if (expiresAt != -1 )
275
- timeout .schedule (expiresAt );
275
+ requestTimeouts .schedule (expiresAt );
276
276
277
277
if (!client .isRunning () && exchanges .remove (exchange ))
278
278
{
@@ -425,7 +425,7 @@ public void close()
425
425
if (LOG .isDebugEnabled ())
426
426
LOG .debug ("Closed {}" , this );
427
427
connectionPool .close ();
428
- timeout .destroy ();
428
+ requestTimeouts .destroy ();
429
429
}
430
430
431
431
public void release (Connection connection )
@@ -547,15 +547,15 @@ public String toString()
547
547
}
548
548
549
549
/**
550
- * This class enforces the total timeout for exchanges that are still in the queue.
551
- * The total timeout for exchanges that are not in the destination queue is enforced
552
- * by {@link HttpChannel}.
550
+ * <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
551
+ * <p> The total timeout for exchanges that are not in the destination queue
552
+ * is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
553
553
*/
554
- private class TimeoutTask extends CyclicTimeout
554
+ private class RequestTimeouts extends CyclicTimeout
555
555
{
556
- private final AtomicLong nextTimeout = new AtomicLong (Long .MAX_VALUE );
556
+ private final AtomicLong earliestTimeout = new AtomicLong (Long .MAX_VALUE );
557
557
558
- private TimeoutTask (Scheduler scheduler )
558
+ private RequestTimeouts (Scheduler scheduler )
559
559
{
560
560
super (scheduler );
561
561
}
@@ -564,14 +564,17 @@ private TimeoutTask(Scheduler scheduler)
564
564
public void onTimeoutExpired ()
565
565
{
566
566
if (LOG .isDebugEnabled ())
567
- LOG .debug ("{} timeout expired " , this );
567
+ LOG .debug ("{} timeouts check " , this );
568
568
569
- nextTimeout .set (Long .MAX_VALUE );
570
569
long now = System .nanoTime ();
571
- long nextExpiresAt = Long .MAX_VALUE ;
572
570
573
- // Check all queued exchanges for those that have expired
574
- // and to determine when the next check must be.
571
+ // Reset the earliest timeout so we can expire again.
572
+ // A concurrent call to schedule(long) may lose an earliest
573
+ // value, but the corresponding exchange is already enqueued
574
+ // and will be seen by scanning the exchange queue below.
575
+ earliestTimeout .set (Long .MAX_VALUE );
576
+
577
+ long earliest = Long .MAX_VALUE ;
575
578
for (HttpExchange exchange : exchanges )
576
579
{
577
580
HttpRequest request = exchange .getRequest ();
@@ -580,34 +583,27 @@ public void onTimeoutExpired()
580
583
continue ;
581
584
if (expiresAt <= now )
582
585
request .abort (new TimeoutException ("Total timeout " + request .getTimeout () + " ms elapsed" ));
583
- else if (expiresAt < nextExpiresAt )
584
- nextExpiresAt = expiresAt ;
586
+ else if (expiresAt < earliest )
587
+ earliest = expiresAt ;
585
588
}
586
589
587
- if (nextExpiresAt < Long .MAX_VALUE && client .isRunning ())
588
- schedule (nextExpiresAt );
590
+ if (earliest < Long .MAX_VALUE && client .isRunning ())
591
+ schedule (earliest );
589
592
}
590
593
591
594
private void schedule (long expiresAt )
592
595
{
593
- // Schedule a timeout for the soonest any known exchange can expire.
594
- // If subsequently that exchange is removed from the queue, the
595
- // timeout is not cancelled, instead the entire queue is swept
596
- // for expired exchanges and a new timeout is set.
597
- long timeoutAt = nextTimeout .getAndUpdate (e -> Math .min (e , expiresAt ));
598
- if (timeoutAt != expiresAt )
596
+ // Schedule a timeout for the earliest exchange that may expire.
597
+ // When the timeout expires, scan the exchange queue for the next
598
+ // earliest exchange that may expire, and reschedule a new timeout.
599
+ long earliest = earliestTimeout .getAndUpdate (t -> Math .min (t , expiresAt ));
600
+ if (expiresAt != earliest )
599
601
{
600
- long delay = expiresAt - System .nanoTime ();
601
- if (delay <= 0 )
602
- {
603
- onTimeoutExpired ();
604
- }
605
- else
606
- {
607
- schedule (delay , TimeUnit .NANOSECONDS );
608
- if (LOG .isDebugEnabled ())
609
- LOG .debug ("{} scheduled timeout in {} ms" , this , TimeUnit .NANOSECONDS .toMillis (delay ));
610
- }
602
+ // A new request expires earlier than previous requests, schedule it.
603
+ long delay = Math .max (0 , expiresAt - System .nanoTime ());
604
+ if (LOG .isDebugEnabled ())
605
+ LOG .debug ("{} scheduling timeout in {} ms" , this , TimeUnit .NANOSECONDS .toMillis (delay ));
606
+ schedule (delay , TimeUnit .NANOSECONDS );
611
607
}
612
608
}
613
609
}
0 commit comments