26
26
import com .google .common .base .Function ;
27
27
import com .google .common .util .concurrent .ListenableFuture ;
28
28
import com .google .spanner .v1 .BatchWriteResponse ;
29
+ import io .opentelemetry .api .common .Attributes ;
29
30
import javax .annotation .Nullable ;
30
31
31
32
class DatabaseClientImpl implements DatabaseClient {
32
33
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction" ;
33
34
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction" ;
34
35
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction" ;
35
36
private final TraceWrapper tracer ;
37
+ private Attributes commonAttributes ;
36
38
@ VisibleForTesting final String clientId ;
37
39
@ VisibleForTesting final SessionPool pool ;
38
40
@ VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ;
@@ -50,7 +52,8 @@ class DatabaseClientImpl implements DatabaseClient {
50
52
/* multiplexedSessionDatabaseClient = */ null ,
51
53
/* useMultiplexedSessionPartitionedOps= */ false ,
52
54
tracer ,
53
- /* useMultiplexedSessionForRW = */ false );
55
+ /* useMultiplexedSessionForRW = */ false ,
56
+ Attributes .empty ());
54
57
}
55
58
56
59
@ VisibleForTesting
@@ -62,7 +65,8 @@ class DatabaseClientImpl implements DatabaseClient {
62
65
/* multiplexedSessionDatabaseClient = */ null ,
63
66
/* useMultiplexedSessionPartitionedOps= */ false ,
64
67
tracer ,
65
- /* useMultiplexedSessionForRW = */ false );
68
+ /* useMultiplexedSessionForRW = */ false ,
69
+ Attributes .empty ());
66
70
}
67
71
68
72
DatabaseClientImpl (
@@ -72,14 +76,16 @@ class DatabaseClientImpl implements DatabaseClient {
72
76
@ Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ,
73
77
boolean useMultiplexedSessionPartitionedOps ,
74
78
TraceWrapper tracer ,
75
- boolean useMultiplexedSessionForRW ) {
79
+ boolean useMultiplexedSessionForRW ,
80
+ Attributes commonAttributes ) {
76
81
this .clientId = clientId ;
77
82
this .pool = pool ;
78
83
this .useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite ;
79
84
this .multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient ;
80
85
this .useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps ;
81
86
this .tracer = tracer ;
82
87
this .useMultiplexedSessionForRW = useMultiplexedSessionForRW ;
88
+ this .commonAttributes = commonAttributes ;
83
89
}
84
90
85
91
@ VisibleForTesting
@@ -138,7 +144,7 @@ public Timestamp write(final Iterable<Mutation> mutations) throws SpannerExcepti
138
144
public CommitResponse writeWithOptions (
139
145
final Iterable <Mutation > mutations , final TransactionOption ... options )
140
146
throws SpannerException {
141
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
147
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
142
148
try (IScope s = tracer .withSpan (span )) {
143
149
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
144
150
return getMultiplexedSessionDatabaseClient ().writeWithOptions (mutations , options );
@@ -161,7 +167,7 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
161
167
public CommitResponse writeAtLeastOnceWithOptions (
162
168
final Iterable <Mutation > mutations , final TransactionOption ... options )
163
169
throws SpannerException {
164
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
170
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
165
171
try (IScope s = tracer .withSpan (span )) {
166
172
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient () != null ) {
167
173
return getMultiplexedSessionDatabaseClient ()
@@ -181,7 +187,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
181
187
public ServerStream <BatchWriteResponse > batchWriteAtLeastOnce (
182
188
final Iterable <MutationGroup > mutationGroups , final TransactionOption ... options )
183
189
throws SpannerException {
184
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
190
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
185
191
try (IScope s = tracer .withSpan (span )) {
186
192
return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
187
193
} catch (RuntimeException e ) {
@@ -194,7 +200,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
194
200
195
201
@ Override
196
202
public ReadContext singleUse () {
197
- ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION );
203
+ ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION , commonAttributes );
198
204
try (IScope s = tracer .withSpan (span )) {
199
205
return getMultiplexedSession ().singleUse ();
200
206
} catch (RuntimeException e ) {
@@ -206,7 +212,7 @@ public ReadContext singleUse() {
206
212
207
213
@ Override
208
214
public ReadContext singleUse (TimestampBound bound ) {
209
- ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION );
215
+ ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION , commonAttributes );
210
216
try (IScope s = tracer .withSpan (span )) {
211
217
return getMultiplexedSession ().singleUse (bound );
212
218
} catch (RuntimeException e ) {
@@ -218,7 +224,7 @@ public ReadContext singleUse(TimestampBound bound) {
218
224
219
225
@ Override
220
226
public ReadOnlyTransaction singleUseReadOnlyTransaction () {
221
- ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION );
227
+ ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION , commonAttributes );
222
228
try (IScope s = tracer .withSpan (span )) {
223
229
return getMultiplexedSession ().singleUseReadOnlyTransaction ();
224
230
} catch (RuntimeException e ) {
@@ -230,7 +236,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
230
236
231
237
@ Override
232
238
public ReadOnlyTransaction singleUseReadOnlyTransaction (TimestampBound bound ) {
233
- ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION );
239
+ ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION , commonAttributes );
234
240
try (IScope s = tracer .withSpan (span )) {
235
241
return getMultiplexedSession ().singleUseReadOnlyTransaction (bound );
236
242
} catch (RuntimeException e ) {
@@ -242,7 +248,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
242
248
243
249
@ Override
244
250
public ReadOnlyTransaction readOnlyTransaction () {
245
- ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION );
251
+ ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION , commonAttributes );
246
252
try (IScope s = tracer .withSpan (span )) {
247
253
return getMultiplexedSession ().readOnlyTransaction ();
248
254
} catch (RuntimeException e ) {
@@ -254,7 +260,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
254
260
255
261
@ Override
256
262
public ReadOnlyTransaction readOnlyTransaction (TimestampBound bound ) {
257
- ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION );
263
+ ISpan span = tracer .spanBuilder (READ_ONLY_TRANSACTION , commonAttributes );
258
264
try (IScope s = tracer .withSpan (span )) {
259
265
return getMultiplexedSession ().readOnlyTransaction (bound );
260
266
} catch (RuntimeException e ) {
@@ -266,7 +272,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
266
272
267
273
@ Override
268
274
public TransactionRunner readWriteTransaction (TransactionOption ... options ) {
269
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
275
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
270
276
try (IScope s = tracer .withSpan (span )) {
271
277
return getMultiplexedSessionForRW ().readWriteTransaction (options );
272
278
} catch (RuntimeException e ) {
@@ -278,7 +284,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
278
284
279
285
@ Override
280
286
public TransactionManager transactionManager (TransactionOption ... options ) {
281
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
287
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
282
288
try (IScope s = tracer .withSpan (span )) {
283
289
return getMultiplexedSessionForRW ().transactionManager (options );
284
290
} catch (RuntimeException e ) {
@@ -290,7 +296,7 @@ public TransactionManager transactionManager(TransactionOption... options) {
290
296
291
297
@ Override
292
298
public AsyncRunner runAsync (TransactionOption ... options ) {
293
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
299
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
294
300
try (IScope s = tracer .withSpan (span )) {
295
301
return getMultiplexedSessionForRW ().runAsync (options );
296
302
} catch (RuntimeException e ) {
@@ -302,7 +308,7 @@ public AsyncRunner runAsync(TransactionOption... options) {
302
308
303
309
@ Override
304
310
public AsyncTransactionManager transactionManagerAsync (TransactionOption ... options ) {
305
- ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , options );
311
+ ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
306
312
try (IScope s = tracer .withSpan (span )) {
307
313
return getMultiplexedSessionForRW ().transactionManagerAsync (options );
308
314
} catch (RuntimeException e ) {
@@ -322,7 +328,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
322
328
323
329
private long executePartitionedUpdateWithPooledSession (
324
330
final Statement stmt , final UpdateOption ... options ) {
325
- ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION );
331
+ ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
326
332
try (IScope s = tracer .withSpan (span )) {
327
333
return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
328
334
} catch (RuntimeException e ) {
0 commit comments