Skip to content

Commit

Permalink
Make sure we don't initialize the OpenTelemetry global singleton by m…
Browse files Browse the repository at this point in the history
…istake

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>
  • Loading branch information
atoulme committed Jul 8, 2022
1 parent dddc289 commit 04c7921
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void setUp() throws Exception {
MetricsConfiguration configuration =
MetricsConfiguration.builder()
.protocol(MetricsProtocol.OPENTELEMETRY)
.pushEnabled(true)
.enabled(true)
.port(0)
.hostsAllowlist(singletonList("*"))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.hyperledger.besu.ethereum.api.tls.TlsClientAuthConfiguration;
import org.hyperledger.besu.ethereum.api.tls.TlsConfiguration;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
Expand All @@ -55,12 +56,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
Expand Down Expand Up @@ -124,6 +126,7 @@ public String get(final @Nullable HttpServerRequest carrier, final String key) {
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private TracerProvider tracerProvider;
private Tracer tracer;
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
Expand Down Expand Up @@ -200,6 +203,9 @@ public JsonRpcHttpService(
this.livenessService = livenessService;
this.readinessService = readinessService;
this.maxActiveConnections = config.getMaxActiveConnections();
if (metricsSystem instanceof OpenTelemetrySystem) {
this.tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}
}

private void validateConfig(final JsonRpcConfiguration config) {
Expand All @@ -214,8 +220,11 @@ private void validateConfig(final JsonRpcConfiguration config) {
public CompletableFuture<?> start() {
LOG.info("Starting JSON-RPC service on {}:{}", config.getHost(), config.getPort());
LOG.debug("max number of active connections {}", maxActiveConnections);
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");

if (this.tracerProvider != null) {
this.tracer = tracerProvider.get("org.hyperledger.besu.jsonrpc", "1.0.0");
} else {
this.tracer = OpenTelemetry.noop().getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
}
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
try {
// Create the HTTP server and a router object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.hyperledger.besu.ethereum.api.tls.TlsConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
Expand All @@ -61,12 +62,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
Expand Down Expand Up @@ -134,6 +136,7 @@ public String get(final @Nullable HttpServerRequest carrier, final String key) {
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private TracerProvider tracerProvider;
private Tracer tracer;
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
Expand Down Expand Up @@ -182,6 +185,9 @@ public JsonRpcService(
"Time taken to process a JSON-RPC request",
"methodName");
JsonRpcProcessor jsonRpcProcessor = new BaseJsonRpcProcessor();
if (metricsSystem instanceof OpenTelemetrySystem) {
this.tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}

this.socketConfiguration =
maybeSockets.isPresent() ? maybeSockets.get() : WebSocketConfiguration.createDefault();
Expand Down Expand Up @@ -214,8 +220,11 @@ public JsonRpcService(
public CompletableFuture<Void> start() {
LOG.info("Starting JSON-RPC service on {}:{}", config.getHost(), config.getPort());
LOG.debug("max number of active connections {}", maxActiveConnections);
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");

if (this.tracerProvider != null) {
this.tracer = tracerProvider.get("org.hyperledger.besu.jsonrpc", "1.0.0");
} else {
this.tracer = OpenTelemetry.noop().getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
}
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
try {
// Create the HTTP server and a router object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import java.util.stream.Stream;

Expand All @@ -45,8 +47,10 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
Expand All @@ -56,6 +60,7 @@
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.data.SummaryPointData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -81,6 +86,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
new ConcurrentHashMap<>();
private final SdkMeterProvider sdkMeterProvider;
private final DebugMetricReader debugMetricReader;
private final SdkTracerProvider sdkTracerProvider;

public OpenTelemetrySystem(
final Set<MetricCategory> enabledCategories,
Expand All @@ -106,6 +112,7 @@ public OpenTelemetrySystem(
.build();
OpenTelemetrySdk sdk = autoSdk.getOpenTelemetrySdk();
this.sdkMeterProvider = sdk.getSdkMeterProvider();
this.sdkTracerProvider = sdk.getSdkTracerProvider();
}

@Override
Expand Down Expand Up @@ -343,4 +350,16 @@ private void collectGC() {
}
});
}

/** Shuts down the OpenTelemetry exporters, blocking until they have completed orderly. */
public void shutdown() {
final CompletableResultCode result =
CompletableResultCode.ofAll(
Arrays.asList(this.sdkMeterProvider.shutdown(), this.sdkTracerProvider.shutdown()));
result.join(5000, TimeUnit.SECONDS);
}

public TracerProvider getTracerProvider() {
return sdkTracerProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.stream.Stream;

import com.google.common.collect.ImmutableSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class OpenTelemetryMetricsSystemTest {
Expand All @@ -51,8 +53,7 @@ public class OpenTelemetryMetricsSystemTest {
.thenComparing(Observation::getMetricName)
.thenComparing((o1, o2) -> o1.getLabels().equals(o2.getLabels()) ? 0 : 1);

private final ObservableMetricsSystem metricsSystem =
new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, true, "job", false);
private OpenTelemetrySystem metricsSystem = null;

private List<Observation> getObservation(final ObservableMetricsSystem metricsSystem)
throws InterruptedException {
Expand All @@ -67,6 +68,16 @@ private List<Observation> getObservation(final ObservableMetricsSystem metricsSy
return null;
}

@Before
public void setUp() {
metricsSystem = new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, true, "job", false);
}

@After
public void tearDown() {
metricsSystem.shutdown();
}

@Test
public void shouldCreateObservationFromCounter() throws InterruptedException {
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string");
Expand Down Expand Up @@ -167,15 +178,21 @@ public void shouldCreateObservationsFromTimerWithLabels() {

@Test
public void shouldNotCreateObservationsFromTimerWhenTimersDisabled() {
final ObservableMetricsSystem metricsSystem =
new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, false, "job", false);
final LabelledMetric<OperationTimer> timer =
metricsSystem.createLabelledTimer(RPC, "request", "Some help", "methodName");

//noinspection EmptyTryBlock
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {}

assertThat(metricsSystem.streamObservations()).isEmpty();
OpenTelemetrySystem metricsSystem = null;
try {
metricsSystem = new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, false, "job", false);
final LabelledMetric<OperationTimer> timer =
metricsSystem.createLabelledTimer(RPC, "request", "Some help", "methodName");

//noinspection EmptyTryBlock
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {}

assertThat(metricsSystem.streamObservations()).isEmpty();
} finally {
if (metricsSystem != null) {
metricsSystem.shutdown();
}
}
}

@Test
Expand All @@ -186,17 +203,24 @@ public void shouldCreateObservationFromGauge() {
.enabled(true)
.protocol(OPENTELEMETRY)
.build();
final OpenTelemetrySystem localMetricSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob(),
false);
localMetricSystem.initDefaults();
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7.0);

assertThat(localMetricSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", 7.0, emptyList()));
OpenTelemetrySystem localMetricSystem = null;
try {
localMetricSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob(),
false);
localMetricSystem.initDefaults();
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7.0);

assertThat(localMetricSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", 7.0, emptyList()));
} finally {
if (localMetricSystem != null) {
localMetricSystem.shutdown();
}
}
}

@Test
Expand All @@ -222,30 +246,38 @@ public void shouldOnlyObserveEnabledMetrics() throws InterruptedException {
.enabled(true)
.protocol(OPENTELEMETRY)
.build();
final OpenTelemetrySystem localMetricSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob(),
false);
localMetricSystem.initDefaults();

// do a category we are not watching
final LabelledMetric<Counter> counterN =
localMetricSystem.createLabelledCounter(NETWORK, "ABC", "Not that kind of network", "show");
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);

counterN.labels("show").inc();
assertThat(localMetricSystem.streamObservations()).isEmpty();

// do a category we are watching
final LabelledMetric<Counter> counterR =
localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method");
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);

counterR.labels("op").inc();
assertThat(getObservation(localMetricSystem))
.containsExactly(new Observation(RPC, "name", (long) 1, singletonList("op")));
OpenTelemetrySystem localMetricSystem = null;
try {
localMetricSystem =
new OpenTelemetrySystem(
metricsConfiguration.getMetricCategories(),
metricsConfiguration.isTimersEnabled(),
metricsConfiguration.getPrometheusJob(),
false);
localMetricSystem.initDefaults();

// do a category we are not watching
final LabelledMetric<Counter> counterN =
localMetricSystem.createLabelledCounter(
NETWORK, "ABC", "Not that kind of network", "show");
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);

counterN.labels("show").inc();
assertThat(localMetricSystem.streamObservations()).isEmpty();

// do a category we are watching
final LabelledMetric<Counter> counterR =
localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method");
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);

counterR.labels("op").inc();
assertThat(getObservation(localMetricSystem))
.containsExactly(new Observation(RPC, "name", (long) 1, singletonList("op")));
} finally {
if (localMetricSystem != null) {
localMetricSystem.shutdown();
}
}
}

@Test
Expand All @@ -269,8 +301,15 @@ public void returnsPrometheusMetricsWhenEnabled() {
.pushEnabled(false)
.protocol(OPENTELEMETRY)
.build();
final MetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration);

assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class);
OpenTelemetrySystem localMetricSystem = null;
try {
localMetricSystem = (OpenTelemetrySystem) MetricsSystemFactory.create(metricsConfiguration);

assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class);
} finally {
if (localMetricSystem != null) {
localMetricSystem.shutdown();
}
}
}
}

0 comments on commit 04c7921

Please sign in to comment.