Skip to content

Commit

Permalink
envoyalsreceiver: add identifier (#37800)
Browse files Browse the repository at this point in the history
Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
  • Loading branch information
zirain and evan-bradley authored Feb 24, 2025
1 parent fa6262e commit f644777
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/envoyals-part2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: envoyalsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add node and log identifier resource attributes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37800]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
36 changes: 24 additions & 12 deletions receiver/envoyalsreceiver/als_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ func TestLogs(t *testing.T) {
require.NoError(t, err)
ts := int64(pcommon.NewTimestampFromTime(tm))

nodeID := &corev3.Node{
Id: "test-id",
Cluster: "test-cluster",
}

identifier := &alsv3.StreamAccessLogsMessage_Identifier{
Node: &corev3.Node{
Id: "test-id",
Cluster: "test-cluster",
},
Node: nodeID,
LogName: "test-log-name",
}

Expand Down Expand Up @@ -114,14 +116,17 @@ func TestLogs(t *testing.T) {
},
},
},
expected: generateLogs([]Log{
expected: generateLogs(map[string]string{
"node": nodeID.String(),
"log_name": "test-log-name",
}, []Log{
{
Timestamp: ts,
Attributes: map[string]any{
"api_version": "v3",
"log_type": "http",
},
Body: pcommon.NewValueStr(httpLog.String()),
Body: httpLog.String(),
},
}),
},
Expand All @@ -137,14 +142,17 @@ func TestLogs(t *testing.T) {
},
},
},
expected: generateLogs([]Log{
expected: generateLogs(map[string]string{
"node": nodeID.String(),
"log_name": "test-log-name",
}, []Log{
{
Timestamp: ts,
Attributes: map[string]any{
"api_version": "v3",
"log_type": "tcp",
},
Body: pcommon.NewValueStr(tcpLog.String()),
Body: tcpLog.String(),
},
}),
},
Expand All @@ -171,19 +179,23 @@ func TestLogs(t *testing.T) {

type Log struct {
Timestamp int64
Body pcommon.Value
Body string
Attributes map[string]any
}

func generateLogs(logs []Log) plog.Logs {
func generateLogs(resourceAttrs map[string]string, logs []Log) plog.Logs {
ld := plog.NewLogs()
logSlice := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
rls := ld.ResourceLogs().AppendEmpty()
for k, v := range resourceAttrs {
rls.Resource().Attributes().PutStr(k, v)
}
logSlice := rls.ScopeLogs().AppendEmpty().LogRecords()

for _, log := range logs {
lr := logSlice.AppendEmpty()
_ = lr.Attributes().FromRaw(log.Attributes)
lr.SetTimestamp(pcommon.Timestamp(log.Timestamp))
lr.Body().SetStr(log.Body.AsString())
lr.Body().SetStr(log.Body)
}
return ld
}
13 changes: 12 additions & 1 deletion receiver/envoyalsreceiver/internal/als/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
const (
apiVersionAttr = "api_version"
apiVersionVal = "v3"
nodeAttr = "node"
logNameAttr = "log_name"
logTypeAttr = "log_type"
httpTypeVal = "http"
tcpTypeVal = "tcp"
Expand Down Expand Up @@ -62,8 +64,17 @@ func toLogs(data *alsv3.StreamAccessLogsMessage) plog.Logs {
logs := plog.NewLogs()

rls := logs.ResourceLogs().AppendEmpty()
logSlice := rls.ScopeLogs().AppendEmpty().LogRecords()
identifier := data.GetIdentifier()
if identifier != nil {
if identifier.Node != nil {
rls.Resource().Attributes().PutStr(nodeAttr, identifier.Node.String())
}
if identifier.LogName != "" {
rls.Resource().Attributes().PutStr(logNameAttr, identifier.LogName)
}
}

logSlice := rls.ScopeLogs().AppendEmpty().LogRecords()
httpLogs := data.GetHttpLogs()
if httpLogs != nil {
for _, httpLog := range httpLogs.LogEntry {
Expand Down
173 changes: 173 additions & 0 deletions receiver/envoyalsreceiver/internal/als/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package als // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/envoyalsreceiver/internal/als"

import (
"testing"
"time"

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
alsdata "github.com/envoyproxy/go-control-plane/envoy/data/accesslog/v3"
alsv3 "github.com/envoyproxy/go-control-plane/envoy/service/accesslog/v3"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestToLogs(t *testing.T) {
tm, err := time.Parse(time.RFC3339Nano, "2020-07-30T01:01:01.123456789Z")
require.NoError(t, err)
ts := int64(pcommon.NewTimestampFromTime(tm))

httpLog := &alsdata.HTTPAccessLogEntry{
CommonProperties: &alsdata.AccessLogCommon{
StartTime: timestamppb.New(tm),
},
Request: &alsdata.HTTPRequestProperties{
Path: "/test",
Authority: "example.com",
},
Response: &alsdata.HTTPResponseProperties{
ResponseCode: wrapperspb.UInt32(200),
},
}

tcpLog := &alsdata.TCPAccessLogEntry{
CommonProperties: &alsdata.AccessLogCommon{
StartTime: timestamppb.New(tm),
},
ConnectionProperties: &alsdata.ConnectionProperties{
ReceivedBytes: 10,
SentBytes: 20,
},
}

nodeID := &corev3.Node{
Id: "node-id",
}

cases := []struct {
name string
input *alsv3.StreamAccessLogsMessage
want plog.Logs
}{
{
name: "tcp",
input: &alsv3.StreamAccessLogsMessage{
Identifier: &alsv3.StreamAccessLogsMessage_Identifier{
Node: &corev3.Node{
Id: "node-id",
},
LogName: "test-log-name",
},
LogEntries: &alsv3.StreamAccessLogsMessage_TcpLogs{
TcpLogs: &alsv3.StreamAccessLogsMessage_TCPAccessLogEntries{
LogEntry: []*alsdata.TCPAccessLogEntry{
{
CommonProperties: &alsdata.AccessLogCommon{
StartTime: timestamppb.New(tm),
},
ConnectionProperties: &alsdata.ConnectionProperties{
ReceivedBytes: 10,
SentBytes: 20,
},
},
},
},
},
},
want: generateLogs(map[string]string{
"node": nodeID.String(),
"log_name": "test-log-name",
}, []Log{
{
Timestamp: ts,
Attributes: map[string]any{
"api_version": "v3",
"log_type": "tcp",
},
Body: tcpLog.String(),
},
}),
},
{
name: "http",
input: &alsv3.StreamAccessLogsMessage{
Identifier: &alsv3.StreamAccessLogsMessage_Identifier{
Node: &corev3.Node{
Id: "node-id",
},
LogName: "test-log-name",
},
LogEntries: &alsv3.StreamAccessLogsMessage_HttpLogs{
HttpLogs: &alsv3.StreamAccessLogsMessage_HTTPAccessLogEntries{
LogEntry: []*alsdata.HTTPAccessLogEntry{
{
CommonProperties: &alsdata.AccessLogCommon{
StartTime: timestamppb.New(tm),
},
Request: &alsdata.HTTPRequestProperties{
Path: "/test",
Authority: "example.com",
},
Response: &alsdata.HTTPResponseProperties{
ResponseCode: wrapperspb.UInt32(200),
},
},
},
},
},
},
want: generateLogs(map[string]string{
"node": nodeID.String(),
"log_name": "test-log-name",
}, []Log{
{
Timestamp: ts,
Attributes: map[string]any{
"api_version": "v3",
"log_type": "http",
},
Body: httpLog.String(),
},
}),
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := toLogs(tc.input)

err := plogtest.CompareLogs(tc.want, got, plogtest.IgnoreObservedTimestamp())
require.NoError(t, err)
})
}
}

type Log struct {
Timestamp int64
Body string
Attributes map[string]any
}

func generateLogs(resourceAttrs map[string]string, logs []Log) plog.Logs {
ld := plog.NewLogs()
rls := ld.ResourceLogs().AppendEmpty()
for k, v := range resourceAttrs {
rls.Resource().Attributes().PutStr(k, v)
}
logSlice := rls.ScopeLogs().AppendEmpty().LogRecords()

for _, log := range logs {
lr := logSlice.AppendEmpty()
_ = lr.Attributes().FromRaw(log.Attributes)
lr.SetTimestamp(pcommon.Timestamp(log.Timestamp))
lr.Body().SetStr(log.Body)
}
return ld
}

0 comments on commit f644777

Please sign in to comment.