Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use (Async) ExecChainHandler to measure IOExceptions (#3800) #3801

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ subprojects {
excludeTags 'docker'
}

retry {
maxFailures = 5
maxRetries = 3
}
// retry {
// maxFailures = 5
// maxRetries = 3
// }
}

task dockerTest(type: Test) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,69 +15,67 @@
*/
package io.micrometer.core.instrument.binder.httpcomponents.hc5;

import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.http.HttpRequestTags;
import io.micrometer.core.instrument.binder.http.Outcome;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.HttpResponseInterceptor;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.util.function.Function;

/**
* Provides {@link HttpRequestInterceptor} and {@link HttpResponseInterceptor} for
* instrumenting async Apache HTTP Client 5. Configure the interceptors on an
* {@link org.apache.hc.client5.http.async.HttpAsyncClient}. Usage example: <pre>{@code
* Provides {@link AsyncExecChainHandler} for instrumenting async Apache HTTP Client 5.
* Configure the handler {@link org.apache.hc.client5.http.async.HttpAsyncClient}. Usage
* example: <pre>{@code
* MicrometerHttpClientInterceptor interceptor = new MicrometerHttpClientInterceptor(registry,
* HttpRequest::getRequestUri,
* Tags.empty(),
* true);
*
* CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom()
* .addRequestInterceptorFirst(interceptor.getRequestInterceptor())
* .addResponseInterceptorLast(interceptor.getResponseInterceptor())
* .addExecInterceptorFirst("custom", interceptor.getExecChainHandler())
* .build();
* }</pre>
*
* @author Jon Schneider
* @author Lars Uffmann
* @since 1.11.0
*/
public class MicrometerHttpClientInterceptor {

private static final String METER_NAME = "httpcomponents.httpclient.request";

private final Map<HttpContext, Timer.ResourceSample> timerByHttpContext = new ConcurrentHashMap<>();
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(MicrometerHttpClientInterceptor.class);

private final HttpRequestInterceptor requestInterceptor;
private static final String METER_NAME = "httpcomponents.httpclient.request";

private final HttpResponseInterceptor responseInterceptor;
private final AsyncExecChainHandler execChainHandler;

/**
* Create a {@code MicrometerHttpClientInterceptor} instance.
* @param meterRegistry meter registry to bind
* @param uriMapper URI mapper to create {@code uri} tag
* @param extraTags extra tags
* @param exportTagsForRoute whether to export tags for route
* @param meterRetries whether to meter retries individually
*
*/
public MicrometerHttpClientInterceptor(MeterRegistry meterRegistry, Function<HttpRequest, String> uriMapper,
Iterable<Tag> extraTags, boolean exportTagsForRoute) {
this.requestInterceptor = (request, entityDetails, context) -> timerByHttpContext.put(context,
Timer.resource(meterRegistry, METER_NAME)
.tags("method", request.getMethod(), "uri", uriMapper.apply(request)));

this.responseInterceptor = (response, entityDetails, context) -> {
timerByHttpContext.remove(context)
.tag("status", Integer.toString(response.getCode()))
.tag("outcome", Outcome.forStatus(response.getCode()).name())
.tags(exportTagsForRoute ? HttpContextUtils.generateTagsForRoute(context) : Tags.empty())
.tags(extraTags)
.close();
};
Iterable<Tag> extraTags, boolean exportTagsForRoute, boolean meterRetries) {
this.execChainHandler = new AsyncMeteringExecChainHandler(meterRegistry, uriMapper, exportTagsForRoute,
extraTags, meterRetries);
}

/**
Expand All @@ -86,18 +84,108 @@ public MicrometerHttpClientInterceptor(MeterRegistry meterRegistry, Function<Htt
* @param meterRegistry meter registry to bind
* @param extraTags extra tags
* @param exportTagsForRoute whether to export tags for route
* @param meterRetries whether to meter retries individually
*/
public MicrometerHttpClientInterceptor(MeterRegistry meterRegistry, Iterable<Tag> extraTags,
boolean exportTagsForRoute) {
this(meterRegistry, new DefaultUriMapper(), extraTags, exportTagsForRoute);
boolean exportTagsForRoute, boolean meterRetries) {
this(meterRegistry, new DefaultUriMapper(), extraTags, exportTagsForRoute, meterRetries);
}

public HttpRequestInterceptor getRequestInterceptor() {
return requestInterceptor;
public AsyncExecChainHandler getExecChainHandler() {
return execChainHandler;
}

public HttpResponseInterceptor getResponseInterceptor() {
return responseInterceptor;
private class AsyncMeteringExecChainHandler implements AsyncExecChainHandler {

private final MeterRegistry meterRegistry;

private final Function<HttpRequest, String> uriMapper;

private final boolean exportTagsForRoute;

private final Iterable<Tag> extraTags;

private final boolean meterRetries;

public AsyncMeteringExecChainHandler(MeterRegistry meterRegistry, Function<HttpRequest, String> uriMapper,
boolean exportTagsForRoute, Iterable<Tag> extraTags, boolean meterRetries) {
this.meterRegistry = meterRegistry;
this.uriMapper = uriMapper;
this.exportTagsForRoute = exportTagsForRoute;
this.extraTags = extraTags;
this.meterRetries = meterRetries;
}

public void meterExecution(HttpRequest request, AsyncEntityProducer entityProducer, AsyncExecChain.Scope scope,
AsyncExecChain chain, AsyncExecCallback asyncExecCallback) throws HttpException, IOException {

final Timer.ResourceSample sample = Timer.resource(meterRegistry, METER_NAME)
.tags("method", request.getMethod(), "uri", uriMapper.apply(request));

logger.trace("Start Sample: {} execCount {}", sample, scope.execCount.get());

chain.proceed(request, entityProducer, scope, new AsyncExecCallback() {
@Override
public AsyncDataConsumer handleResponse(HttpResponse response, EntityDetails entityDetails)
throws HttpException, IOException {
sample.tag("status", Integer.toString(response.getCode()))
.tag("outcome", Outcome.forStatus(response.getCode()).name())
.tag("exception", "None")
.tags(exportTagsForRoute ? HttpContextUtils.generateTagsForRoute(scope.clientContext)
: Tags.empty())
.tags(extraTags)
.close();

logger.trace("handleResponse: {} execCount {}", sample, scope.execCount.get());

return asyncExecCallback.handleResponse(response, entityDetails);
}

@Override
public void handleInformationResponse(HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}

@Override
public void completed() {
logger.trace("completed: {}", sample);

asyncExecCallback.completed();
}

@Override
public void failed(Exception cause) {
sample.tag("status", "IO_ERROR")
.tag("outcome", "UNKNOWN")
.tag("exception", HttpRequestTags.exception(cause).getValue())
.tags(exportTagsForRoute ? HttpContextUtils.generateTagsForRoute(scope.clientContext)
: Tags.empty())
.tags(extraTags)
.close();

logger.trace("failed: {} execCount {}", sample, scope.execCount.get());

asyncExecCallback.failed(cause);
}
});
}

@Override
public void execute(HttpRequest request, AsyncEntityProducer entityProducer, AsyncExecChain.Scope scope,
AsyncExecChain chain, AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
if (meterRetries) {
meterExecution(request, entityProducer, scope, chain, asyncExecCallback);
}
else {
if (scope.execCount.get() == 1) {
meterExecution(request, entityProducer, scope, chain, asyncExecCallback);
}
else {
chain.proceed(request, entityProducer, scope, asyncExecCallback);
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import io.micrometer.core.instrument.observation.ObservationOrTimerCompatibleInstrumentation;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.classic.ExecChainHandler;
import org.apache.hc.core5.http.*;
import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
import org.apache.hc.core5.http.io.HttpClientConnection;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.Timeout;

import java.io.IOException;
Expand All @@ -53,7 +53,7 @@
* @author Tommy Ludwig
* @since 1.11.0
*/
public class MicrometerHttpRequestExecutor extends HttpRequestExecutor {
public class MicrometerHttpRequestExecutor implements ExecChainHandler {

static final String METER_NAME = "httpcomponents.httpclient.request";

Expand All @@ -76,7 +76,6 @@ public class MicrometerHttpRequestExecutor extends HttpRequestExecutor {
private MicrometerHttpRequestExecutor(Timeout waitForContinue, MeterRegistry registry,
Function<HttpRequest, String> uriMapper, Iterable<Tag> extraTags, boolean exportTagsForRoute,
ObservationRegistry observationRegistry, @Nullable ApacheHttpClientObservationConvention convention) {
super(waitForContinue, null, null);
this.registry = Optional.ofNullable(registry)
.orElseThrow(() -> new IllegalArgumentException("registry is required but has been initialized with null"));
this.uriMapper = Optional.ofNullable(uriMapper)
Expand All @@ -99,17 +98,17 @@ public static Builder builder(MeterRegistry registry) {
}

@Override
public ClassicHttpResponse execute(ClassicHttpRequest request, HttpClientConnection conn, HttpContext context)
public ClassicHttpResponse execute(ClassicHttpRequest request, ExecChain.Scope scope, ExecChain chain)
throws IOException, HttpException {
ObservationOrTimerCompatibleInstrumentation<ApacheHttpClientContext> sample = ObservationOrTimerCompatibleInstrumentation
.start(registry, observationRegistry,
() -> new ApacheHttpClientContext(request, context, uriMapper, exportTagsForRoute), convention,
DefaultApacheHttpClientObservationConvention.INSTANCE);
() -> new ApacheHttpClientContext(request, scope.clientContext, uriMapper, exportTagsForRoute),
convention, DefaultApacheHttpClientObservationConvention.INSTANCE);
String statusCodeOrError = "UNKNOWN";
Outcome statusOutcome = Outcome.UNKNOWN;

try {
ClassicHttpResponse response = super.execute(request, conn, context);
ClassicHttpResponse response = chain.proceed(request, scope);
sample.setResponse(response);
statusCodeOrError = DefaultApacheHttpClientObservationConvention.INSTANCE.getStatusValue(response, null);
statusOutcome = DefaultApacheHttpClientObservationConvention.INSTANCE.getStatusOutcome(response);
Expand All @@ -123,12 +122,11 @@ public ClassicHttpResponse execute(ClassicHttpRequest request, HttpClientConnect
finally {
String status = statusCodeOrError;
String outcome = statusOutcome.name();
sample.stop(METER_NAME, "Duration of Apache HttpClient request execution",
() -> Tags
.of("method", DefaultApacheHttpClientObservationConvention.INSTANCE.getMethodString(request),
"uri", uriMapper.apply(request), "status", status, "outcome", outcome)
.and(exportTagsForRoute ? HttpContextUtils.generateTagsForRoute(context) : Tags.empty())
.and(extraTags));
sample.stop(METER_NAME, "Duration of Apache HttpClient request execution", () -> Tags
.of("method", DefaultApacheHttpClientObservationConvention.INSTANCE.getMethodString(request), "uri",
uriMapper.apply(request), "status", status, "outcome", outcome)
.and(exportTagsForRoute ? HttpContextUtils.generateTagsForRoute(scope.clientContext) : Tags.empty())
.and(extraTags));
}
}

Expand Down
Loading