33
33
import com .google .cloud .Timestamp ;
34
34
import com .google .cloud .spanner .Type .StructField ;
35
35
import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
36
+ import com .google .cloud .spanner .tracing .IScope ;
37
+ import com .google .cloud .spanner .tracing .ISpan ;
38
+ import com .google .cloud .spanner .tracing .TraceWrapper ;
36
39
import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
37
40
import com .google .common .annotations .VisibleForTesting ;
38
41
import com .google .common .base .Preconditions ;
39
42
import com .google .common .collect .AbstractIterator ;
40
- import com .google .common .collect .ImmutableMap ;
41
43
import com .google .common .collect .Lists ;
42
44
import com .google .common .util .concurrent .Uninterruptibles ;
43
45
import com .google .protobuf .ByteString ;
50
52
import com .google .spanner .v1 .Transaction ;
51
53
import com .google .spanner .v1 .TypeCode ;
52
54
import io .grpc .Context ;
53
- import io .opencensus .common .Scope ;
54
- import io .opencensus .trace .AttributeValue ;
55
- import io .opencensus .trace .Span ;
56
- import io .opencensus .trace .Tracer ;
57
55
import io .opencensus .trace .Tracing ;
58
- import io .opentelemetry .api .common .Attributes ;
59
56
import java .io .IOException ;
60
57
import java .io .Serializable ;
61
58
import java .math .BigDecimal ;
83
80
84
81
/** Implementation of {@link ResultSet}. */
85
82
abstract class AbstractResultSet <R > extends AbstractStructReader implements ResultSet {
86
- private static final Tracer tracer = Tracing .getTracer ();
83
+ private static final TraceWrapper tracer = new TraceWrapper ( Tracing .getTracer () );
87
84
private static final com .google .protobuf .Value NULL_VALUE =
88
85
com .google .protobuf .Value .newBuilder ().setNullValue (NullValue .NULL_VALUE ).build ();
89
86
@@ -1094,8 +1091,7 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
1094
1091
private final BackOff backOff ;
1095
1092
private final LinkedList <PartialResultSet > buffer = new LinkedList <>();
1096
1093
private final int maxBufferSize ;
1097
- private final Span span ;
1098
- private final io .opentelemetry .api .trace .Span openTelemetrySpan ;
1094
+ private final ISpan span ;
1099
1095
private CloseableIterator <PartialResultSet > stream ;
1100
1096
private ByteString resumeToken ;
1101
1097
private boolean finished ;
@@ -1109,16 +1105,12 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
1109
1105
protected ResumableStreamIterator (
1110
1106
int maxBufferSize ,
1111
1107
String streamName ,
1112
- Span parent ,
1113
- io .opentelemetry .api .trace .Span openTelemetryParent ,
1108
+ ISpan parent ,
1114
1109
RetrySettings streamingRetrySettings ,
1115
1110
Set <Code > retryableCodes ) {
1116
1111
checkArgument (maxBufferSize >= 0 );
1117
1112
this .maxBufferSize = maxBufferSize ;
1118
- this .openTelemetrySpan =
1119
- OpenTelemetryTraceUtil .spanBuilderWithExplicitParent (
1120
- SpannerOptions .getTracer (), streamName , openTelemetryParent );
1121
- this .span = tracer .spanBuilderWithExplicitParent (streamName , parent ).startSpan ();
1113
+ this .span = tracer .spanBuilderWithExplicitParent (streamName , parent );
1122
1114
this .streamingRetrySettings = Preconditions .checkNotNull (streamingRetrySettings );
1123
1115
this .retryableCodes = Preconditions .checkNotNull (retryableCodes );
1124
1116
this .backOff = newBackOff ();
@@ -1174,15 +1166,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
1174
1166
}
1175
1167
1176
1168
private void backoffSleep (Context context , long backoffMillis ) throws SpannerException {
1177
- tracer
1178
- .getCurrentSpan ()
1179
- .addAnnotation (
1180
- "Backing off" ,
1181
- ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (backoffMillis )));
1182
- OpenTelemetryTraceUtil .addEvent (
1183
- io .opentelemetry .api .trace .Span .fromContext (io .opentelemetry .context .Context .current ()),
1184
- "Backing off" ,
1185
- Attributes .builder ().put ("Delay" , backoffMillis ).build ());
1169
+ tracer .getCurrentSpan ().addAnnotation ("Backing off" , "Delay" , backoffMillis );
1186
1170
final CountDownLatch latch = new CountDownLatch (1 );
1187
1171
final Context .CancellationListener listener =
1188
1172
ignored -> {
@@ -1222,8 +1206,7 @@ public void execute(Runnable command) {
1222
1206
public void close (@ Nullable String message ) {
1223
1207
if (stream != null ) {
1224
1208
stream .close (message );
1225
- span .end (TraceUtil .END_SPAN_OPTIONS );
1226
- OpenTelemetryTraceUtil .endSpan (openTelemetrySpan );
1209
+ span .end ();
1227
1210
stream = null ;
1228
1211
}
1229
1212
}
@@ -1239,20 +1222,12 @@ protected PartialResultSet computeNext() {
1239
1222
while (true ) {
1240
1223
// Eagerly start stream before consuming any buffered items.
1241
1224
if (stream == null ) {
1242
- OpenTelemetryTraceUtil .addEvent (
1243
- openTelemetrySpan ,
1244
- "Starting/Resuming stream" ,
1245
- Attributes .builder ()
1246
- .put ("ResumeToken" , resumeToken == null ? "null" : resumeToken .toStringUtf8 ())
1247
- .build ());
1248
1225
span .addAnnotation (
1249
1226
"Starting/Resuming stream" ,
1250
- ImmutableMap .of (
1251
- "ResumeToken" ,
1252
- AttributeValue .stringAttributeValue (
1253
- resumeToken == null ? "null" : resumeToken .toStringUtf8 ())));
1254
- try (Scope s = tracer .withSpan (span );
1255
- io .opentelemetry .context .Scope ss = openTelemetrySpan .makeCurrent ()) {
1227
+ "ResumeToken" ,
1228
+ resumeToken == null ? "null" : resumeToken .toStringUtf8 ());
1229
+
1230
+ try (IScope sss = tracer .withSpan (span )) {
1256
1231
// When start a new stream set the Span as current to make the gRPC Span a child of
1257
1232
// this Span.
1258
1233
stream = checkNotNull (startStream (resumeToken ));
@@ -1292,22 +1267,16 @@ protected PartialResultSet computeNext() {
1292
1267
}
1293
1268
} catch (SpannerException spannerException ) {
1294
1269
if (safeToRetry && isRetryable (spannerException )) {
1295
- OpenTelemetryTraceUtil .addEvent (
1296
- openTelemetrySpan ,
1297
- "Stream broken. Safe to retry" ,
1298
- OpenTelemetryTraceUtil .getExceptionAnnotations (spannerException ));
1299
- span .addAnnotation (
1300
- "Stream broken. Safe to retry" ,
1301
- TraceUtil .getExceptionAnnotations (spannerException ));
1270
+
1271
+ span .addAnnotation ("Stream broken. Safe to retry" , spannerException );
1302
1272
logger .log (Level .FINE , "Retryable exception, will sleep and retry" , spannerException );
1303
1273
// Truncate any items in the buffer before the last retry token.
1304
1274
while (!buffer .isEmpty () && buffer .getLast ().getResumeToken ().isEmpty ()) {
1305
1275
buffer .removeLast ();
1306
1276
}
1307
1277
assert buffer .isEmpty () || buffer .getLast ().getResumeToken ().equals (resumeToken );
1308
1278
stream = null ;
1309
- try (Scope s = tracer .withSpan (span );
1310
- io .opentelemetry .context .Scope ss = openTelemetrySpan .makeCurrent ()) {
1279
+ try (IScope s = tracer .withSpan (span )) {
1311
1280
long delay = spannerException .getRetryDelayInMillis ();
1312
1281
if (delay != -1 ) {
1313
1282
backoffSleep (context , delay );
@@ -1318,17 +1287,12 @@ protected PartialResultSet computeNext() {
1318
1287
1319
1288
continue ;
1320
1289
}
1321
- span .addAnnotation ("Stream broken. Not safe to retry" );
1322
- TraceUtil .setWithFailure (span , spannerException );
1323
- OpenTelemetryTraceUtil .addEventWithExceptionAndSetFailure (
1324
- openTelemetrySpan , "Stream broken. Safe to retry" , spannerException );
1290
+ span .addAnnotation ("Stream broken. Not safe to retry" , spannerException );
1291
+ span .setStatus (spannerException );
1325
1292
throw spannerException ;
1326
1293
} catch (RuntimeException e ) {
1327
- span .addAnnotation ("Stream broken. Not safe to retry" );
1328
- TraceUtil .setWithFailure (span , e );
1329
- OpenTelemetryTraceUtil .addEventWithExceptionAndSetFailure (
1330
- openTelemetrySpan , "Stream broken. Safe to retry" , e );
1331
-
1294
+ span .addAnnotation ("Stream broken. Not safe to retry" , e );
1295
+ span .setStatus (e );
1332
1296
throw e ;
1333
1297
}
1334
1298
}
0 commit comments