Skip to content

add option to skip log tokenization #23440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
107 changes: 67 additions & 40 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Encoding: helper.NewEncodingConfig(),
Encoding: helper.NewEncodingConfig(),
OneLogPerPacket: false,
Multiline: helper.MultilineConfig{
LineStartPattern: "",
LineEndPattern: ".^", // Use never matching regex to not split data by default
Expand All @@ -57,6 +58,7 @@ type Config struct {
// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
Expand Down Expand Up @@ -97,13 +99,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

udpInput := &Input{
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
OneLogPerPacket: c.OneLogPerPacket,
}
return udpInput, nil
}
Expand All @@ -112,8 +115,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
type Input struct {
buffer []byte
helper.InputOperator
address *net.UDPAddr
addAttributes bool
address *net.UDPAddr
addAttributes bool
OneLogPerPacket bool

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -159,42 +163,19 @@ func (u *Input) goHandleMessages(ctx context.Context) {
break
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
handleMessage(u, ctx, remoteAddr, log)
continue
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)

for scanner.Scan() {
decoded, err := u.encoding.Decode(scanner.Bytes())
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
continue
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
continue
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
handleMessage(u, ctx, remoteAddr, scanner.Bytes())
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
Expand All @@ -203,6 +184,52 @@ func (u *Input) goHandleMessages(ctx context.Context) {
}()
}

func truncateMaxLog(data []byte) (token []byte) {
dataLength := len(data)
if dataLength >= MaxUDPSize {
return data[:MaxUDPSize]
}

if dataLength == 0 {
return nil
}

return data
}

func handleMessage(u *Input, ctx context.Context, remoteAddr net.Addr, log []byte) {
decoded, err := u.encoding.Decode(log)
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
return
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
return
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
}

// readMessage will read log messages from the connection.
func (u *Input) readMessage() ([]byte, net.Addr, error) {
n, addr, err := u.connection.ReadFrom(u.buffer)
Expand Down