Skip to content

Commit

Permalink
remove the last use of GlobalOpenTelemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>
  • Loading branch information
atoulme committed Jul 23, 2022
1 parent 3c2f9b2 commit 8cfd882
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 49 deletions.
1 change: 1 addition & 0 deletions ethereum/eth/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation 'org.apache.tuweni:tuweni-bytes'
implementation 'org.apache.tuweni:tuweni-units'
implementation 'org.apache.tuweni:tuweni-rlp'
implementation 'io.opentelemetry:opentelemetry-api'

annotationProcessor "org.immutables:value"
implementation "org.immutables:value-annotations"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;

import java.util.concurrent.CompletionStage;

import io.opentelemetry.api.trace.TracerProvider;

public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipelineFactory {

public CheckpointSyncDownloadPipelineFactory(
Expand Down Expand Up @@ -74,6 +77,11 @@ protected Pipeline<Hash> createDownloadCheckPointPipeline(
final CheckpointDownloadBlockStep checkPointDownloadBlockStep =
new CheckpointDownloadBlockStep(protocolSchedule, ethContext, checkpoint, metricsSystem);

TracerProvider tracerProvider = null;
if (metricsSystem instanceof OpenTelemetrySystem) {
tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}

return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
checkPointSource,
Expand All @@ -84,7 +92,7 @@ protected Pipeline<Hash> createDownloadCheckPointPipeline(
"Number of header process by each chain download pipeline stage",
"step",
"action"),
true,
tracerProvider,
"checkpointSync")
.thenProcessAsyncOrdered("downloadBlock", checkPointDownloadBlockStep::downloadBlock, 1)
.andFinishWith("importBlock", checkPointBlockImportStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
Expand All @@ -47,6 +48,8 @@

import java.util.concurrent.CompletionStage;

import io.opentelemetry.api.trace.TracerProvider;

public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory {
protected final SynchronizerConfiguration syncConfig;
protected final ProtocolSchedule protocolSchedule;
Expand Down Expand Up @@ -143,7 +146,10 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
attachedValidationPolicy,
ommerValidationPolicy,
ethContext);

TracerProvider tracerProvider = null;
if (metricsSystem instanceof OpenTelemetrySystem) {
tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
checkpointRangeSource,
Expand All @@ -154,7 +160,7 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
"Number of entries process by each chain download pipeline stage",
"step",
"action"),
true,
tracerProvider,
"fastSync")
.thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.eth.sync.worldstate.TaskQueueIterator;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;

import io.opentelemetry.api.trace.TracerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -158,6 +160,11 @@ public FastWorldStateDownloadProcess build() {
checkNotNull(pivotBlockHeader);
checkNotNull(metricsSystem);

TracerProvider tracerProvider = null;
if (metricsSystem instanceof OpenTelemetrySystem) {
tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}

// Room for the requests we expect to do in parallel plus some buffer but not unlimited.
final int bufferCapacity = hashCountPerRequest * 2;
final LabelledMetric<Counter> outputCounter =
Expand All @@ -170,7 +177,11 @@ public FastWorldStateDownloadProcess build() {

final Pipeline<Task<NodeDataRequest>> completionPipeline =
PipelineBuilder.<Task<NodeDataRequest>>createPipeline(
"requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request")
"requestDataAvailable",
bufferCapacity,
outputCounter,
tracerProvider,
"node_data_request")
.andFinishWith(
"requestCompleteTask",
task ->
Expand All @@ -184,7 +195,7 @@ public FastWorldStateDownloadProcess build() {
new TaskQueueIterator<>(downloadState),
bufferCapacity,
outputCounter,
true,
tracerProvider,
"world_state_download")
.thenFlatMapInParallel(
"requestLoadLocalData",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;

import java.util.concurrent.CompletionStage;

import io.opentelemetry.api.trace.TracerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -109,7 +111,10 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
final FullImportBlockStep importBlockStep =
new FullImportBlockStep(
protocolSchedule, protocolContext, ethContext, fullSyncTerminationCondition);

TracerProvider tracerProvider = null;
if (metricsSystem instanceof OpenTelemetrySystem) {
tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
checkpointRangeSource,
Expand All @@ -120,7 +125,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
"Number of entries process by each chain download pipeline stage",
"step",
"action"),
true,
tracerProvider,
"fullSync")
.thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hyperledger.besu.ethereum.eth.sync.worldstate.TaskQueueIterator;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;

import io.opentelemetry.api.trace.TracerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -200,6 +202,11 @@ public SnapWorldStateDownloadProcess build() {
checkNotNull(snapSyncState);
checkNotNull(metricsSystem);

TracerProvider tracerProvider = null;
if (metricsSystem instanceof OpenTelemetrySystem) {
tracerProvider = ((OpenTelemetrySystem) metricsSystem).getTracerProvider();
}

// Room for the requests we expect to do in parallel plus some buffer but not unlimited.
final int bufferCapacity = snapSyncConfiguration.getTrienodeCountPerRequest() * 2;
final LabelledMetric<Counter> outputCounter =
Expand All @@ -212,7 +219,11 @@ public SnapWorldStateDownloadProcess build() {

final Pipeline<Task<SnapDataRequest>> completionPipeline =
PipelineBuilder.<Task<SnapDataRequest>>createPipeline(
"requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request")
"requestDataAvailable",
bufferCapacity,
outputCounter,
tracerProvider,
"node_data_request")
.andFinishWith(
"requestCompleteTask",
task -> completeTaskStep.markAsCompleteOrFailed(downloadState, task));
Expand All @@ -226,7 +237,7 @@ public SnapWorldStateDownloadProcess build() {
downloadState, () -> downloadState.dequeueAccountRequestBlocking()),
bufferCapacity,
outputCounter,
true,
tracerProvider,
"world_state_download")
.thenProcess(
"checkNewPivotBlock",
Expand All @@ -248,7 +259,7 @@ public SnapWorldStateDownloadProcess build() {
downloadState, () -> downloadState.dequeueStorageRequestBlocking()),
bufferCapacity,
outputCounter,
true,
tracerProvider,
"world_state_download")
.inBatches(snapSyncConfiguration.getStorageCountPerRequest())
.thenProcess(
Expand All @@ -275,7 +286,7 @@ public SnapWorldStateDownloadProcess build() {
downloadState, () -> downloadState.dequeueBigStorageRequestBlocking()),
bufferCapacity,
outputCounter,
true,
tracerProvider,
"world_state_download")
.thenProcess(
"checkNewPivotBlock",
Expand Down Expand Up @@ -303,7 +314,7 @@ public SnapWorldStateDownloadProcess build() {
downloadState, () -> downloadState.dequeueCodeRequestBlocking()),
bufferCapacity,
outputCounter,
true,
tracerProvider,
"code_blocks_download_pipeline")
.inBatches(
snapSyncConfiguration.getBytecodeCountPerRequest() * 2,
Expand Down Expand Up @@ -344,7 +355,7 @@ public SnapWorldStateDownloadProcess build() {
downloadState, () -> downloadState.dequeueTrieNodeRequestBlocking()),
bufferCapacity,
outputCounter,
true,
tracerProvider,
"world_state_download")
.thenFlatMapInParallel(
"requestLoadLocalTrieNodeData",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,8 +42,7 @@ public class Pipeline<I> {
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Tracer tracer =
GlobalOpenTelemetry.getTracer("org.hyperledger.besu.services.pipeline", "1.0.0");
private final Tracer tracer;

/**
* Flags that the pipeline is being completed so that when we abort we can close the streams
Expand All @@ -55,22 +54,25 @@ public class Pipeline<I> {

private final CompletableFuture<Void> overallFuture = new CompletableFuture<>();
private final String name;
private final boolean tracingEnabled;
private volatile List<Future<?>> futures;

Pipeline(
final Pipe<I> inputPipe,
final String name,
final boolean tracingEnabled,
final TracerProvider tracerProvider,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
this.tracingEnabled = tracingEnabled;
this.name = name;
this.stages = stages;
this.pipes = pipes;
this.completerStage = completerStage;
if (tracerProvider != null) {
this.tracer = tracerProvider.get("org.hyperledger.besu.services.pipeline", "1.0.0");
} else {
this.tracer = null;
}
}

/**
Expand Down Expand Up @@ -137,7 +139,7 @@ private Future<?> runWithErrorHandling(final ExecutorService executorService, fi
return executorService.submit(
() -> {
Span taskSpan = null;
if (tracingEnabled) {
if (tracer != null) {
taskSpan =
tracer
.spanBuilder(task.getName())
Expand All @@ -151,7 +153,7 @@ private Future<?> runWithErrorHandling(final ExecutorService executorService, fi
thread.setName(originalName + " (" + task.getName() + ")");
task.run();
} catch (final Throwable t) {
if (tracingEnabled) {
if (tracer != null) {
taskSpan.setStatus(StatusCode.ERROR);
}
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
Expand All @@ -164,7 +166,7 @@ private Future<?> runWithErrorHandling(final ExecutorService executorService, fi
LOG.error("Failed to abort pipeline after error", t2);
}
} finally {
if (tracingEnabled) {
if (tracer != null) {
taskSpan.end();
}
thread.setName(originalName);
Expand Down
Loading

0 comments on commit 8cfd882

Please sign in to comment.