Skip to content

Commit 910c7cc

Browse files
authored
Retry ConnectException, add retry logging (#6614)
1 parent e2936d4 commit 910c7cc

File tree

4 files changed

+180
-16
lines changed

4 files changed

+180
-16
lines changed

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

+48-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.exporter.sender.jdk.internal;
77

8+
import static java.util.stream.Collectors.joining;
9+
810
import io.opentelemetry.exporter.internal.compression.Compressor;
911
import io.opentelemetry.exporter.internal.http.HttpSender;
1012
import io.opentelemetry.exporter.internal.marshal.Marshaler;
@@ -25,6 +27,7 @@
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Set;
30+
import java.util.StringJoiner;
2831
import java.util.concurrent.CompletableFuture;
2932
import java.util.concurrent.ConcurrentLinkedQueue;
3033
import java.util.concurrent.ExecutorService;
@@ -33,6 +36,8 @@
3336
import java.util.concurrent.TimeUnit;
3437
import java.util.function.Consumer;
3538
import java.util.function.Supplier;
39+
import java.util.logging.Level;
40+
import java.util.logging.Logger;
3641
import javax.annotation.Nullable;
3742
import javax.net.ssl.SSLContext;
3843
import javax.net.ssl.SSLException;
@@ -52,6 +57,8 @@ public final class JdkHttpSender implements HttpSender {
5257
private static final ThreadLocal<ByteBufferPool> threadLocalByteBufPool =
5358
ThreadLocal.withInitial(ByteBufferPool::new);
5459

60+
private static final Logger logger = Logger.getLogger(JdkHttpSender.class.getName());
61+
5562
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
5663
private final HttpClient client;
5764
private final URI uri;
@@ -211,11 +218,37 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
211218
exception = e;
212219
}
213220

214-
if (httpResponse != null && !retryableStatusCodes.contains(httpResponse.statusCode())) {
215-
return httpResponse;
221+
if (httpResponse != null) {
222+
boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode());
223+
if (logger.isLoggable(Level.FINER)) {
224+
logger.log(
225+
Level.FINER,
226+
"Attempt "
227+
+ attempt
228+
+ " returned "
229+
+ (retryable ? "retryable" : "non-retryable")
230+
+ " response: "
231+
+ responseStringRepresentation(httpResponse));
232+
}
233+
if (!retryable) {
234+
return httpResponse;
235+
}
216236
}
217-
if (exception != null && !isRetryableException(exception)) {
218-
throw exception;
237+
if (exception != null) {
238+
boolean retryable = isRetryableException(exception);
239+
if (logger.isLoggable(Level.FINER)) {
240+
logger.log(
241+
Level.FINER,
242+
"Attempt "
243+
+ attempt
244+
+ " failed with "
245+
+ (retryable ? "retryable" : "non-retryable")
246+
+ " exception",
247+
exception);
248+
}
249+
if (!retryable) {
250+
throw exception;
251+
}
219252
}
220253
} while (attempt < retryPolicy.getMaxAttempts());
221254

@@ -225,6 +258,17 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
225258
throw exception;
226259
}
227260

261+
private static String responseStringRepresentation(HttpResponse<?> response) {
262+
StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}");
263+
joiner.add("code=" + response.statusCode());
264+
joiner.add(
265+
"headers="
266+
+ response.headers().map().entrySet().stream()
267+
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
268+
.collect(joining(",", "[", "]")));
269+
return joiner.toString();
270+
}
271+
228272
private void write(Marshaler marshaler, OutputStream os) throws IOException {
229273
if (exportAsJson) {
230274
marshaler.writeJsonTo(os);

exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.opentelemetry.exporter.internal.marshal.Serializer;
1919
import io.opentelemetry.sdk.common.export.RetryPolicy;
2020
import java.io.IOException;
21+
import java.net.ConnectException;
22+
import java.net.ServerSocket;
2123
import java.net.http.HttpClient;
2224
import java.net.http.HttpConnectTimeoutException;
2325
import java.time.Duration;
@@ -53,8 +55,8 @@ void setup() throws IOException, InterruptedException {
5355
sender =
5456
new JdkHttpSender(
5557
mockHttpClient,
56-
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
57-
// timeout
58+
// Connecting to a non-routable IP address to trigger connection timeout
59+
"http://10.255.255.1",
5860
null,
5961
false,
6062
"text/plain",
@@ -74,6 +76,44 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru
7476
verify(mockHttpClient, times(2)).send(any(), any());
7577
}
7678

79+
@Test
80+
void sendInternal_RetryableConnectException() throws IOException, InterruptedException {
81+
sender =
82+
new JdkHttpSender(
83+
mockHttpClient,
84+
// Connecting to localhost on an unused port address to trigger
85+
// java.net.ConnectException (or java.net.http.HttpConnectTimeoutException on linux java
86+
// 11+)
87+
"http://localhost:" + freePort(),
88+
null,
89+
false,
90+
"text/plain",
91+
Duration.ofSeconds(10).toNanos(),
92+
Collections::emptyMap,
93+
RetryPolicy.builder()
94+
.setMaxAttempts(2)
95+
.setInitialBackoff(Duration.ofMillis(1))
96+
.build());
97+
98+
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
99+
.satisfies(
100+
e ->
101+
assertThat(
102+
(e instanceof ConnectException)
103+
|| (e instanceof HttpConnectTimeoutException))
104+
.isTrue());
105+
106+
verify(mockHttpClient, times(2)).send(any(), any());
107+
}
108+
109+
private static int freePort() {
110+
try (ServerSocket socket = new ServerSocket(0)) {
111+
return socket.getLocalPort();
112+
} catch (IOException e) {
113+
throw new RuntimeException(e);
114+
}
115+
}
116+
77117
@Test
78118
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
79119
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

+60-10
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,19 @@
55

66
package io.opentelemetry.exporter.sender.okhttp.internal;
77

8+
import static java.util.stream.Collectors.joining;
9+
810
import io.opentelemetry.sdk.common.export.RetryPolicy;
911
import java.io.IOException;
12+
import java.net.ConnectException;
1013
import java.net.SocketTimeoutException;
1114
import java.util.Locale;
15+
import java.util.StringJoiner;
1216
import java.util.concurrent.ThreadLocalRandom;
1317
import java.util.concurrent.TimeUnit;
1418
import java.util.function.Function;
19+
import java.util.logging.Level;
20+
import java.util.logging.Logger;
1521
import okhttp3.Interceptor;
1622
import okhttp3.Response;
1723

@@ -23,6 +29,8 @@
2329
*/
2430
public final class RetryInterceptor implements Interceptor {
2531

32+
private static final Logger logger = Logger.getLogger(RetryInterceptor.class.getName());
33+
2634
private final RetryPolicy retryPolicy;
2735
private final Function<Response, Boolean> isRetryable;
2836
private final Function<IOException, Boolean> isRetryableException;
@@ -84,12 +92,39 @@ public Response intercept(Chain chain) throws IOException {
8492
} catch (IOException e) {
8593
exception = e;
8694
}
87-
if (response != null && !Boolean.TRUE.equals(isRetryable.apply(response))) {
88-
return response;
95+
if (response != null) {
96+
boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response));
97+
if (logger.isLoggable(Level.FINER)) {
98+
logger.log(
99+
Level.FINER,
100+
"Attempt "
101+
+ attempt
102+
+ " returned "
103+
+ (retryable ? "retryable" : "non-retryable")
104+
+ " response: "
105+
+ responseStringRepresentation(response));
106+
}
107+
if (!retryable) {
108+
return response;
109+
}
89110
}
90-
if (exception != null && !Boolean.TRUE.equals(isRetryableException.apply(exception))) {
91-
throw exception;
111+
if (exception != null) {
112+
boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception));
113+
if (logger.isLoggable(Level.FINER)) {
114+
logger.log(
115+
Level.FINER,
116+
"Attempt "
117+
+ attempt
118+
+ " failed with "
119+
+ (retryable ? "retryable" : "non-retryable")
120+
+ " exception",
121+
exception);
122+
}
123+
if (!retryable) {
124+
throw exception;
125+
}
92126
}
127+
93128
} while (attempt < retryPolicy.getMaxAttempts());
94129

95130
if (response != null) {
@@ -98,15 +133,30 @@ public Response intercept(Chain chain) throws IOException {
98133
throw exception;
99134
}
100135

136+
private static String responseStringRepresentation(Response response) {
137+
StringJoiner joiner = new StringJoiner(",", "Response{", "}");
138+
joiner.add("code=" + response.code());
139+
joiner.add(
140+
"headers="
141+
+ response.headers().toMultimap().entrySet().stream()
142+
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
143+
.collect(joining(",", "[", "]")));
144+
return joiner.toString();
145+
}
146+
101147
// Visible for testing
102148
static boolean isRetryableException(IOException e) {
103-
if (!(e instanceof SocketTimeoutException)) {
104-
return false;
149+
if (e instanceof SocketTimeoutException) {
150+
String message = e.getMessage();
151+
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect
152+
// timed out"
153+
return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out");
154+
} else if (e instanceof ConnectException) {
155+
// Exceptions resemble: java.net.ConnectException: Failed to connect to
156+
// localhost/[0:0:0:0:0:0:0:1]:62611
157+
return true;
105158
}
106-
String message = e.getMessage();
107-
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect timed
108-
// out"
109-
return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out");
159+
return false;
110160
}
111161

112162
// Visible for testing

exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
2424
import io.opentelemetry.sdk.common.export.RetryPolicy;
2525
import java.io.IOException;
26+
import java.net.ConnectException;
27+
import java.net.ServerSocket;
2628
import java.net.SocketTimeoutException;
2729
import java.time.Duration;
2830
import java.util.concurrent.TimeUnit;
@@ -157,6 +159,34 @@ void connectTimeout() throws Exception {
157159
verify(sleeper, times(4)).sleep(anyLong());
158160
}
159161

162+
@Test
163+
void connectException() throws Exception {
164+
client = connectTimeoutClient();
165+
when(random.get(anyLong())).thenReturn(1L);
166+
doNothing().when(sleeper).sleep(anyLong());
167+
168+
// Connecting to localhost on an unused port address to trigger java.net.ConnectException
169+
int openPort = freePort();
170+
assertThatThrownBy(
171+
() ->
172+
client
173+
.newCall(new Request.Builder().url("http://localhost:" + openPort).build())
174+
.execute())
175+
.isInstanceOf(ConnectException.class);
176+
177+
verify(isRetryableException, times(5)).apply(any());
178+
// Should retry maxAttempts, and sleep maxAttempts - 1 times
179+
verify(sleeper, times(4)).sleep(anyLong());
180+
}
181+
182+
private static int freePort() {
183+
try (ServerSocket socket = new ServerSocket(0)) {
184+
return socket.getLocalPort();
185+
} catch (IOException e) {
186+
throw new RuntimeException(e);
187+
}
188+
}
189+
160190
@Test
161191
void nonRetryableException() throws InterruptedException {
162192
client = connectTimeoutClient();

0 commit comments

Comments
 (0)