diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index 13ec68c1db0..4151a1f6a07 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -123,7 +123,7 @@ func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.Trace body, err := ze.serializer.Serialize(tbatch) if err != nil { - return len(td.Spans), err + return len(td.Spans), consumererror.Permanent(err) } req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body)) diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index c979e389b51..a8ca378c1cf 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -390,6 +390,7 @@ func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb. } node := nodeFromZipkinEndpoints(zs, pbs) + zipkin.SetTimestampsIfUnset(pbs) return pbs, node } diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index 0d0ea681058..775e37324f3 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -427,7 +427,8 @@ func TestSpanKindTranslation(t *testing.T) { TraceID: zipkinmodel.TraceID{Low: 123}, ID: 456, }, - Kind: tt.zipkinKind, + Kind: tt.zipkinKind, + Timestamp: time.Now(), } ocSpan, _ := zipkinSpanToTraceSpan(zs) assert.EqualValues(t, tt.ocKind, ocSpan.Kind) diff --git a/translator/trace/zipkin/attributekeys.go b/translator/trace/zipkin/attributekeys.go index 5c13932b834..4124231089c 100644 --- a/translator/trace/zipkin/attributekeys.go +++ b/translator/trace/zipkin/attributekeys.go @@ -25,4 +25,5 @@ const ( RemoteEndpointIPv6 = "zipkin.remoteEndpoint.ipv6" RemoteEndpointPort = "zipkin.remoteEndpoint.port" RemoteEndpointServiceName = "zipkin.remoteEndpoint.serviceName" + StartTimeAbsent = "otel.zipkin.absentField.startTime" ) diff --git a/translator/trace/zipkin/protospan_to_zipkinv1.go b/translator/trace/zipkin/protospan_to_zipkinv1.go index fcd6548aae5..5f6da87bb24 100644 --- a/translator/trace/zipkin/protospan_to_zipkinv1.go +++ b/translator/trace/zipkin/protospan_to_zipkinv1.go @@ -17,6 +17,7 @@ package zipkin import ( "net" "strconv" + "time" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" @@ -243,7 +244,13 @@ func OCSpanProtoToZipkin( if spanKindFromAttributes { redundantKeys[tracetranslator.TagSpanKind] = true } + startTime := internal.TimestampToTime(s.StartTime) + if _, ok := attrMap[StartTimeAbsent]; ok { + redundantKeys[StartTimeAbsent] = true + startTime = time.Time{} + } + z := &zipkinmodel.SpanModel{ SpanContext: zipkinmodel.SpanContext{ TraceID: convertTraceID(s.TraceId), diff --git a/translator/trace/zipkin/zipkinv1_to_protospan.go b/translator/trace/zipkin/zipkinv1_to_protospan.go index 570cd53afbd..ddb43d0b8c5 100644 --- a/translator/trace/zipkin/zipkinv1_to_protospan.go +++ b/translator/trace/zipkin/zipkinv1_to_protospan.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "strconv" + "time" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" @@ -27,6 +28,7 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/internal" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) @@ -175,6 +177,7 @@ func zipkinV1ToOCSpan(zSpan *zipkinV1Span) (*tracepb.Span, *annotationParseResul } setSpanKind(ocSpan, parsedAnnotations.Kind, parsedAnnotations.ExtendedKind) + SetTimestampsIfUnset(ocSpan) return ocSpan, parsedAnnotations, nil } @@ -481,3 +484,25 @@ func (ep *endpoint) createAttributeMap() map[string]string { } return attributeMap } + +func SetTimestampsIfUnset(span *tracepb.Span) { + // zipkin allows timestamp to be unset, but opentelemetry-collector expects it to have a value. + // If this is unset, the conversion from open census to the internal trace format breaks + // what should be an identity transformation oc -> internal -> oc + if span.StartTime == nil { + now := internal.TimeToTimestamp(time.Now()) + span.StartTime = now + span.EndTime = now + + if span.Attributes == nil { + span.Attributes = &tracepb.Span_Attributes{} + } + if span.Attributes.AttributeMap == nil { + span.Attributes.AttributeMap = make(map[string]*tracepb.AttributeValue, 1) + } + span.Attributes.AttributeMap[StartTimeAbsent] = &tracepb.AttributeValue{ + Value: &tracepb.AttributeValue_BoolValue{ + BoolValue: true, + }} + } +} diff --git a/translator/trace/zipkin/zipkinv1_to_protospan_test.go b/translator/trace/zipkin/zipkinv1_to_protospan_test.go index 019cd698985..8e877bf4bda 100644 --- a/translator/trace/zipkin/zipkinv1_to_protospan_test.go +++ b/translator/trace/zipkin/zipkinv1_to_protospan_test.go @@ -21,6 +21,7 @@ import ( "sort" "strconv" "testing" + "time" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" @@ -30,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/internal" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) @@ -247,14 +249,15 @@ func sortTraceByNodeName(trace []consumerdata.TraceData) { func TestZipkinAnnotationsToOCStatus(t *testing.T) { type test struct { + name string haveTags []*binaryAnnotation wantAttributes *tracepb.Span_Attributes wantStatus *tracepb.Status } cases := []test{ - // only status.code tag { + name: "only status.code tag", haveTags: []*binaryAnnotation{{ Key: "status.code", Value: "13", @@ -264,8 +267,9 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { Code: 13, }, }, - // only status.message tag + { + name: "only status.message tag", haveTags: []*binaryAnnotation{{ Key: "status.message", Value: "Forbidden", @@ -273,8 +277,9 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { wantAttributes: nil, wantStatus: nil, }, - // both status.code and status.message + { + name: "both status.code and status.message", haveTags: []*binaryAnnotation{ { Key: "status.code", @@ -292,8 +297,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { }, }, - // http status.code { + name: "http status.code", haveTags: []*binaryAnnotation{ { Key: "http.status_code", @@ -324,8 +329,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { }, }, - // http and oc { + name: "http and oc", haveTags: []*binaryAnnotation{ { Key: "http.status_code", @@ -364,8 +369,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { }, }, - // http and only oc code { + name: "http and only oc code", haveTags: []*binaryAnnotation{ { Key: "http.status_code", @@ -398,8 +403,9 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { Code: 14, }, }, - // http and only oc message + { + name: "http and only oc message", haveTags: []*binaryAnnotation{ { Key: "http.status_code", @@ -434,8 +440,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { }, }, - // census tags { + name: "census tags", haveTags: []*binaryAnnotation{ { Key: "census.status_code", @@ -453,8 +459,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { }, }, - // census tags priority over others { + name: "census tags priority over others", haveTags: []*binaryAnnotation{ { Key: "census.status_code", @@ -506,33 +512,79 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) { fakeSpanID := "0000000000000001" for i, c := range cases { - zSpans := []*zipkinV1Span{{ - ID: fakeSpanID, - TraceID: fakeTraceID, - BinaryAnnotations: c.haveTags, - }} - zBytes, err := json.Marshal(zSpans) - if err != nil { - t.Errorf("#%d: Unexpected error: %v", i, err) - continue - } - gb, err := V1JSONBatchToOCProto(zBytes) - if err != nil { - t.Errorf("#%d: Unexpected error: %v", i, err) - continue - } - gs := gb[0].Spans[0] + t.Run(c.name, func(t *testing.T) { + zSpans := []*zipkinV1Span{{ + ID: fakeSpanID, + TraceID: fakeTraceID, + BinaryAnnotations: c.haveTags, + Timestamp: 1, + }} + zBytes, err := json.Marshal(zSpans) + if err != nil { + t.Errorf("#%d: Unexpected error: %v", i, err) + return + } + gb, err := V1JSONBatchToOCProto(zBytes) + if err != nil { + t.Errorf("#%d: Unexpected error: %v", i, err) + return + } + gs := gb[0].Spans[0] - if !reflect.DeepEqual(gs.Attributes, c.wantAttributes) { - t.Fatalf("Unsuccessful conversion\nGot:\n\t%v\nWant:\n\t%v", gs.Attributes, c.wantAttributes) - } + if !reflect.DeepEqual(gs.Attributes, c.wantAttributes) { + t.Fatalf("Unsuccessful conversion\nGot:\n\t%v\nWant:\n\t%v", gs.Attributes, c.wantAttributes) + } - if !reflect.DeepEqual(gs.Status, c.wantStatus) { - t.Fatalf("Unsuccessful conversion: %d\nGot:\n\t%v\nWant:\n\t%v", i, gs.Status, c.wantStatus) - } + if !reflect.DeepEqual(gs.Status, c.wantStatus) { + t.Fatalf("Unsuccessful conversion: %d\nGot:\n\t%v\nWant:\n\t%v", i, gs.Status, c.wantStatus) + } + }) } } +func TestSpanWithoutTimestampGetsTag(t *testing.T) { + fakeTraceID := "00000000000000010000000000000002" + fakeSpanID := "0000000000000001" + zSpans := []*zipkinV1Span{ + { + ID: fakeSpanID, + TraceID: fakeTraceID, + Timestamp: 0, // no timestamp field + }, + } + zBytes, err := json.Marshal(zSpans) + if err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + + testStart := time.Now() + + gb, err := V1JSONBatchToOCProto(zBytes) + if err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + + gs := gb[0].Spans[0] + assert.NotNil(t, gs.StartTime) + assert.NotNil(t, gs.EndTime) + + assert.True(t, internal.TimestampToTime(gs.StartTime).Sub(testStart) >= 0) + + wantAttributes := &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + StartTimeAbsent: { + Value: &tracepb.AttributeValue_BoolValue{ + BoolValue: true, + }, + }, + }, + } + + assert.EqualValues(t, gs.Attributes, wantAttributes) +} + func TestJSONHTTPToGRPCStatusCode(t *testing.T) { fakeTraceID := "00000000000000010000000000000002" fakeSpanID := "0000000000000001"