Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve tracing by adding attributes #3576

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ ResultSet readInternalWithOptions(
SpannerImpl.READ,
span,
tracer,
tracer.createTableAttributes(table, readOptions),
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private final TraceWrapper tracer;
private Attributes commonAttributes;
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
Expand All @@ -50,7 +52,8 @@ class DatabaseClientImpl implements DatabaseClient {
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
/* useMultiplexedSessionForRW = */ false,
Attributes.empty());
}

@VisibleForTesting
Expand All @@ -62,7 +65,8 @@ class DatabaseClientImpl implements DatabaseClient {
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
/* useMultiplexedSessionForRW = */ false,
Attributes.empty());
}

DatabaseClientImpl(
Expand All @@ -72,14 +76,16 @@ class DatabaseClientImpl implements DatabaseClient {
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
boolean useMultiplexedSessionPartitionedOps,
TraceWrapper tracer,
boolean useMultiplexedSessionForRW) {
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;
}

@VisibleForTesting
Expand Down Expand Up @@ -138,7 +144,7 @@ public Timestamp write(final Iterable<Mutation> mutations) throws SpannerExcepti
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
Expand All @@ -161,7 +167,7 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
Expand All @@ -181,7 +187,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
} catch (RuntimeException e) {
Expand All @@ -194,7 +200,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(

@Override
public ReadContext singleUse() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse();
} catch (RuntimeException e) {
Expand All @@ -206,7 +212,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse(bound);
} catch (RuntimeException e) {
Expand All @@ -218,7 +224,7 @@ public ReadContext singleUse(TimestampBound bound) {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -230,7 +236,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -242,7 +248,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {

@Override
public ReadOnlyTransaction readOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -254,7 +260,7 @@ public ReadOnlyTransaction readOnlyTransaction() {

@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -266,7 +272,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().readWriteTransaction(options);
} catch (RuntimeException e) {
Expand All @@ -278,7 +284,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {

@Override
public TransactionManager transactionManager(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManager(options);
} catch (RuntimeException e) {
Expand All @@ -290,7 +296,7 @@ public TransactionManager transactionManager(TransactionOption... options) {

@Override
public AsyncRunner runAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().runAsync(options);
} catch (RuntimeException e) {
Expand All @@ -302,7 +308,7 @@ public AsyncRunner runAsync(TransactionOption... options) {

@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManagerAsync(options);
} catch (RuntimeException e) {
Expand All @@ -322,7 +328,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -125,7 +126,8 @@ private BatchCreateSessionsRunnable(
public void run() {
List<SessionImpl> sessions;
int remainingSessionsToCreate = sessionCount;
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS);
ISpan span =
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
spanner
.getTracer()
Expand Down Expand Up @@ -170,6 +172,7 @@ interface SessionConsumer {
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService executor;
private final DatabaseId db;
private final Attributes commonAttributes;

@GuardedBy("this")
private volatile long sessionChannelCounter;
Expand All @@ -182,6 +185,7 @@ interface SessionConsumer {
this.db = db;
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
}

@Override
Expand All @@ -205,7 +209,7 @@ SessionImpl createSession() {
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION);
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
Expand Down Expand Up @@ -250,7 +254,10 @@ void createMultiplexedSession(SessionConsumer consumer) {
* GRPC channel. In case of an error during the gRPC calls, an exception will be thrown.
*/
SessionImpl createMultiplexedSession() {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
ISpan span =
spanner
.getTracer()
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
multiplexedSessionDatabaseClient,
getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(),
useMultiplexedSessionForRW);
useMultiplexedSessionForRW,
this.tracer.createCommonAttributes(db));
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -329,15 +330,17 @@ DatabaseClientImpl createDatabaseClient(
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
boolean useMultiplexedSessionPartitionedOps,
boolean useMultiplexedSessionForRW) {
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
return new DatabaseClientImpl(
clientId,
pool,
useMultiplexedSessionBlindWrite,
multiplexedSessionClient,
useMultiplexedSessionPartitionedOps,
tracer,
useMultiplexedSessionForRW);
useMultiplexedSessionForRW,
commonAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ class TraceWrapper {
AttributeKey.stringKey("transaction.tag");
private static final AttributeKey<String> STATEMENT_TAG_KEY =
AttributeKey.stringKey("statement.tag");
private static final AttributeKey<String> INSTANCE_NAME_KEY =
AttributeKey.stringKey("instance.name");
private static final AttributeKey<String> DB_NAME_KEY = AttributeKey.stringKey("db.name");
private static final AttributeKey<String> DB_STATEMENT_KEY =
AttributeKey.stringKey("db.statement");
private static final AttributeKey<List<String>> DB_STATEMENT_ARRAY_KEY =
AttributeKey.stringArrayKey("db.statement");
private static final AttributeKey<String> DB_TABLE_NAME_KEY = AttributeKey.stringKey("db.table");
private static final AttributeKey<String> THREAD_NAME_KEY = AttributeKey.stringKey("thread.name");

private final Tracer openCensusTracer;
Expand All @@ -61,8 +65,8 @@ ISpan spanBuilder(String spanName) {
return spanBuilder(spanName, Attributes.empty());
}

ISpan spanBuilder(String spanName, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(options));
ISpan spanBuilder(String spanName, Attributes commonAttributes, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(commonAttributes, options));
}

ISpan spanBuilder(String spanName, Attributes attributes) {
Expand Down Expand Up @@ -137,18 +141,20 @@ IScope withSpan(ISpan span) {
}
}

Attributes createTransactionAttributes(TransactionOption... options) {
Attributes createTransactionAttributes(
Attributes commonAttributes, TransactionOption... options) {
AttributesBuilder builder = commonAttributes.toBuilder();
if (options != null && options.length > 0) {
Optional<TagOption> tagOption =
Arrays.stream(options)
.filter(option -> option instanceof TagOption)
.map(option -> (TagOption) option)
.findAny();
if (tagOption.isPresent()) {
return Attributes.of(TRANSACTION_TAG_KEY, tagOption.get().getTag());
builder.put(TRANSACTION_TAG_KEY, tagOption.get().getTag());
}
}
return Attributes.empty();
return builder.build();
}

Attributes createStatementAttributes(Statement statement, Options options) {
Expand Down Expand Up @@ -185,6 +191,22 @@ Attributes createStatementBatchAttributes(Iterable<Statement> statements, Option
return Attributes.empty();
}

Attributes createTableAttributes(String tableName, Options options) {
AttributesBuilder builder = Attributes.builder();
builder.put(DB_TABLE_NAME_KEY, tableName);
if (options != null && options.hasTag()) {
builder.put(STATEMENT_TAG_KEY, options.tag());
}
return builder.build();
}

Attributes createCommonAttributes(DatabaseId db) {
AttributesBuilder builder = Attributes.builder();
builder.put(DB_NAME_KEY, db.getDatabase());
builder.put(INSTANCE_NAME_KEY, db.getInstanceId().getInstance());
return builder.build();
}

private static String getTraceThreadName() {
return MoreObjects.firstNonNull(
Context.current().get(OpenTelemetryContextKeys.THREAD_NAME_KEY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.SessionFutureWrapper;
import com.google.cloud.spanner.testing.RemoteSpannerHelper;
import io.opentelemetry.api.common.Attributes;

/**
* Subclass of {@link IntegrationTestEnv} that allows the user to specify when the underlying
Expand Down Expand Up @@ -52,7 +53,8 @@ DatabaseClientImpl createDatabaseClient(
boolean useMultiplexedSessionBlindWriteIgnore,
MultiplexedSessionDatabaseClient ignore,
boolean useMultiplexedSessionPartitionedOpsIgnore,
boolean useMultiplexedSessionForRWIgnore) {
boolean useMultiplexedSessionForRWIgnore,
Attributes attributes) {
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
Expand Down
Loading
Loading