Skip to content

add option to skip log tokenization #23440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
16 changes: 16 additions & 0 deletions .chloggen/add-udp-tokenize-flag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to skip log tokenization for both tcp and udp receivers. use the 'one_log_per_packet' setting to skip log tokenization if multiline is not used.

# One or more tracking issues related to the change
issues: [23440]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
27 changes: 14 additions & 13 deletions pkg/stanza/docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |

#### TLS Configuration

Expand Down
23 changes: 12 additions & 11 deletions pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ The `udp_input` operator listens for logs from UDP packets.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |

#### `multiline` configuration

Expand Down
115 changes: 75 additions & 40 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"io"
"net"
"strconv"
"sync"
Expand Down Expand Up @@ -48,8 +50,9 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
OneLogPerPacket: false,
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
},
}
}
Expand All @@ -66,6 +69,7 @@ type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
Expand Down Expand Up @@ -114,12 +118,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

tcpInput := &Input{
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
OneLogPerPacket: c.OneLogPerPacket,
encoding: encoding,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
Expand All @@ -139,9 +144,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
// Input is an operator that listens for log entries over tcp.
type Input struct {
helper.InputOperator
address string
MaxLogSize int
addAttributes bool
address string
MaxLogSize int
addAttributes bool
OneLogPerPacket bool

listener net.Listener
cancel context.CancelFunc
Expand Down Expand Up @@ -239,48 +245,77 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
defer t.wg.Done()
defer cancel()

if t.OneLogPerPacket {
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
t.Errorw("IO copy net connection buffer error", zap.Error(err))
}
log := truncateMaxLog(buf.Bytes(), t.MaxLogSize)
t.handleMessage(ctx, conn, log)
return
}

buf := make([]byte, 0, t.MaxLogSize)

scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.MaxLogSize)

scanner.Split(t.splitFunc)

for scanner.Scan() {
decoded, err := t.encoding.Decode(scanner.Bytes())
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
continue
}
t.handleMessage(ctx, conn, scanner.Bytes())
}

entry, err := t.NewEntry(string(decoded))
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
continue
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))
}
}()
}

if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip))
}
func (t *Input) handleMessage(ctx context.Context, conn net.Conn, log []byte) {
decoded, err := t.encoding.Decode(log)
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
return
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip))
}
}
entry, err := t.NewEntry(string(decoded))
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
return
}

t.Write(ctx, entry)
if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip))
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip))
}
}()
}

t.Write(ctx, entry)
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
if len(data) >= maxLogSize {
return data[:maxLogSize]
}

if len(data) == 0 {
return nil
}

return data
}

// Stop will stop listening for log entries over TCP.
Expand Down
Loading