Skip to content

add option to skip log tokenization #23440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
16 changes: 16 additions & 0 deletions .chloggen/add-udp-tokenize-flag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to skip log tokenization for both tcp and udp receivers. use the 'one_log_per_packet' setting to skip log tokenization if multiline is not used.

# One or more tracking issues related to the change
issues: [23440]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
115 changes: 74 additions & 41 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"io"
"net"
"strconv"
"sync"
Expand Down Expand Up @@ -48,8 +50,9 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
OneLogPerPacket: false,
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
},
}
}
Expand All @@ -66,6 +69,7 @@ type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
Expand Down Expand Up @@ -114,12 +118,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

tcpInput := &Input{
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
OneLogPerPacket: c.OneLogPerPacket,
encoding: encoding,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
Expand All @@ -139,9 +144,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
// Input is an operator that listens for log entries over tcp.
type Input struct {
helper.InputOperator
address string
MaxLogSize int
addAttributes bool
address string
MaxLogSize int
addAttributes bool
OneLogPerPacket bool

listener net.Listener
cancel context.CancelFunc
Expand Down Expand Up @@ -239,48 +245,75 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
defer t.wg.Done()
defer cancel()

if t.OneLogPerPacket {
var buf bytes.Buffer
io.Copy(&buf, conn)
log := truncateMaxLog(buf.Bytes(), t.MaxLogSize)
handleMessage(ctx, conn, t, log)
return
}

buf := make([]byte, 0, t.MaxLogSize)

scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.MaxLogSize)

scanner.Split(t.splitFunc)

for scanner.Scan() {
decoded, err := t.encoding.Decode(scanner.Bytes())
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
continue
}
handleMessage(ctx, conn, t, scanner.Bytes())
}

entry, err := t.NewEntry(string(decoded))
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
continue
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))
}
}()
}

if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip))
}
func handleMessage(ctx context.Context, conn net.Conn, t *Input, log []byte) {
decoded, err := t.encoding.Decode(log)
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
return
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip))
}
}
entry, err := t.NewEntry(string(decoded))
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
return
}

t.Write(ctx, entry)
if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip))
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip))
}
}()
}

t.Write(ctx, entry)
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
dataLength := len(data)

if dataLength >= maxLogSize {
return data[:maxLogSize]
}

if dataLength == 0 {
return nil
}
return data
}

// Stop will stop listening for log entries over TCP.
Expand All @@ -301,4 +334,4 @@ func (t *Input) Stop() error {
t.resolver.Stop()
}
return nil
}
}
107 changes: 67 additions & 40 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Encoding: helper.NewEncodingConfig(),
Encoding: helper.NewEncodingConfig(),
OneLogPerPacket: false,
Multiline: helper.MultilineConfig{
LineStartPattern: "",
LineEndPattern: ".^", // Use never matching regex to not split data by default
Expand All @@ -57,6 +58,7 @@ type Config struct {
// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
Expand Down Expand Up @@ -97,13 +99,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

udpInput := &Input{
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
OneLogPerPacket: c.OneLogPerPacket,
}
return udpInput, nil
}
Expand All @@ -112,8 +115,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
type Input struct {
buffer []byte
helper.InputOperator
address *net.UDPAddr
addAttributes bool
address *net.UDPAddr
addAttributes bool
OneLogPerPacket bool

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -159,42 +163,19 @@ func (u *Input) goHandleMessages(ctx context.Context) {
break
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
handleMessage(ctx, u, remoteAddr, log)
continue
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)

for scanner.Scan() {
decoded, err := u.encoding.Decode(scanner.Bytes())
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
continue
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
continue
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
handleMessage(ctx, u, remoteAddr, scanner.Bytes())
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
Expand All @@ -203,6 +184,52 @@ func (u *Input) goHandleMessages(ctx context.Context) {
}()
}

func truncateMaxLog(data []byte) (token []byte) {
dataLength := len(data)
if dataLength >= MaxUDPSize {
return data[:MaxUDPSize]
}

if dataLength == 0 {
return nil
}

return data
}

func handleMessage(ctx context.Context, u *Input, remoteAddr net.Addr, log []byte) {
decoded, err := u.encoding.Decode(log)
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
return
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
return
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
}

// readMessage will read log messages from the connection.
func (u *Input) readMessage() ([]byte, net.Addr, error) {
n, addr, err := u.connection.ReadFrom(u.buffer)
Expand Down