37
37
import com .google .common .annotations .VisibleForTesting ;
38
38
import com .google .common .base .Preconditions ;
39
39
import com .google .common .collect .AbstractIterator ;
40
- import com .google .common .collect .ImmutableMap ;
41
40
import com .google .common .collect .Lists ;
42
41
import com .google .common .io .CharSource ;
43
42
import com .google .common .util .concurrent .Uninterruptibles ;
53
52
import com .google .spanner .v1 .Transaction ;
54
53
import com .google .spanner .v1 .TypeCode ;
55
54
import io .grpc .Context ;
56
- import io .opencensus .common .Scope ;
57
- import io .opencensus .trace .AttributeValue ;
58
- import io .opencensus .trace .Span ;
59
- import io .opencensus .trace .Tracer ;
60
- import io .opencensus .trace .Tracing ;
61
55
import java .io .IOException ;
62
56
import java .io .Serializable ;
63
57
import java .math .BigDecimal ;
87
81
88
82
/** Implementation of {@link ResultSet}. */
89
83
abstract class AbstractResultSet <R > extends AbstractStructReader implements ResultSet {
90
- private static final Tracer tracer = Tracing .getTracer ();
91
84
private static final com .google .protobuf .Value NULL_VALUE =
92
85
com .google .protobuf .Value .newBuilder ().setNullValue (NullValue .NULL_VALUE ).build ();
93
86
@@ -1206,7 +1199,8 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
1206
1199
private final BackOff backOff ;
1207
1200
private final LinkedList <PartialResultSet > buffer = new LinkedList <>();
1208
1201
private final int maxBufferSize ;
1209
- private final Span span ;
1202
+ private final ISpan span ;
1203
+ private final TraceWrapper tracer ;
1210
1204
private CloseableIterator <PartialResultSet > stream ;
1211
1205
private ByteString resumeToken ;
1212
1206
private boolean finished ;
@@ -1220,12 +1214,14 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
1220
1214
protected ResumableStreamIterator (
1221
1215
int maxBufferSize ,
1222
1216
String streamName ,
1223
- Span parent ,
1217
+ ISpan parent ,
1218
+ TraceWrapper tracer ,
1224
1219
RetrySettings streamingRetrySettings ,
1225
1220
Set <Code > retryableCodes ) {
1226
1221
checkArgument (maxBufferSize >= 0 );
1227
1222
this .maxBufferSize = maxBufferSize ;
1228
- this .span = tracer .spanBuilderWithExplicitParent (streamName , parent ).startSpan ();
1223
+ this .tracer = tracer ;
1224
+ this .span = tracer .spanBuilderWithExplicitParent (streamName , parent );
1229
1225
this .streamingRetrySettings = Preconditions .checkNotNull (streamingRetrySettings );
1230
1226
this .retryableCodes = Preconditions .checkNotNull (retryableCodes );
1231
1227
this .backOff = newBackOff ();
@@ -1281,11 +1277,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
1281
1277
}
1282
1278
1283
1279
private void backoffSleep (Context context , long backoffMillis ) throws SpannerException {
1284
- tracer
1285
- .getCurrentSpan ()
1286
- .addAnnotation (
1287
- "Backing off" ,
1288
- ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (backoffMillis )));
1280
+ tracer .getCurrentSpan ().addAnnotation ("Backing off" , "Delay" , backoffMillis );
1289
1281
final CountDownLatch latch = new CountDownLatch (1 );
1290
1282
final Context .CancellationListener listener =
1291
1283
ignored -> {
@@ -1325,7 +1317,7 @@ public void execute(Runnable command) {
1325
1317
public void close (@ Nullable String message ) {
1326
1318
if (stream != null ) {
1327
1319
stream .close (message );
1328
- span .end (TraceUtil . END_SPAN_OPTIONS );
1320
+ span .end ();
1329
1321
stream = null ;
1330
1322
}
1331
1323
}
@@ -1343,11 +1335,9 @@ protected PartialResultSet computeNext() {
1343
1335
if (stream == null ) {
1344
1336
span .addAnnotation (
1345
1337
"Starting/Resuming stream" ,
1346
- ImmutableMap .of (
1347
- "ResumeToken" ,
1348
- AttributeValue .stringAttributeValue (
1349
- resumeToken == null ? "null" : resumeToken .toStringUtf8 ())));
1350
- try (Scope s = tracer .withSpan (span )) {
1338
+ "ResumeToken" ,
1339
+ resumeToken == null ? "null" : resumeToken .toStringUtf8 ());
1340
+ try (IScope scope = tracer .withSpan (span )) {
1351
1341
// When start a new stream set the Span as current to make the gRPC Span a child of
1352
1342
// this Span.
1353
1343
stream = checkNotNull (startStream (resumeToken ));
@@ -1387,17 +1377,15 @@ protected PartialResultSet computeNext() {
1387
1377
}
1388
1378
} catch (SpannerException spannerException ) {
1389
1379
if (safeToRetry && isRetryable (spannerException )) {
1390
- span .addAnnotation (
1391
- "Stream broken. Safe to retry" ,
1392
- TraceUtil .getExceptionAnnotations (spannerException ));
1380
+ span .addAnnotation ("Stream broken. Safe to retry" , spannerException );
1393
1381
logger .log (Level .FINE , "Retryable exception, will sleep and retry" , spannerException );
1394
1382
// Truncate any items in the buffer before the last retry token.
1395
1383
while (!buffer .isEmpty () && buffer .getLast ().getResumeToken ().isEmpty ()) {
1396
1384
buffer .removeLast ();
1397
1385
}
1398
1386
assert buffer .isEmpty () || buffer .getLast ().getResumeToken ().equals (resumeToken );
1399
1387
stream = null ;
1400
- try (Scope s = tracer .withSpan (span )) {
1388
+ try (IScope s = tracer .withSpan (span )) {
1401
1389
long delay = spannerException .getRetryDelayInMillis ();
1402
1390
if (delay != -1 ) {
1403
1391
backoffSleep (context , delay );
@@ -1408,12 +1396,12 @@ protected PartialResultSet computeNext() {
1408
1396
1409
1397
continue ;
1410
1398
}
1411
- span .addAnnotation ("Stream broken. Not safe to retry" );
1412
- TraceUtil . setWithFailure ( span , spannerException );
1399
+ span .addAnnotation ("Stream broken. Not safe to retry" , spannerException );
1400
+ span . setStatus ( spannerException );
1413
1401
throw spannerException ;
1414
1402
} catch (RuntimeException e ) {
1415
- span .addAnnotation ("Stream broken. Not safe to retry" );
1416
- TraceUtil . setWithFailure ( span , e );
1403
+ span .addAnnotation ("Stream broken. Not safe to retry" , e );
1404
+ span . setStatus ( e );
1417
1405
throw e ;
1418
1406
}
1419
1407
}
0 commit comments