Skip to content

Commit

Permalink
[exporter/doris] Add new config (#38162)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

add new config:

* `log_response` (default = false) Whether to log the response of doris
stream load.
* `label_prefix` (default = open_telemetry) the prefix of the label in
doris stream load. The final generated label is
{label_prefix}{db}{table}{yyyyMMddHHmmss}{uuid}.
* `headers` (default is empty map) The headers of doris stream load.
Details: [header
parameters](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual#load-configuration-parameters)
and [group
commit](https://doris.apache.org/docs/data-operate/import/import-way/group-commit-manual#stream-load).
* `log_progress_interval` (default = 10) The interval, in seconds,
between statistical logs. When it is less than or equal to 0, the
statistical log is not printed.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
No

<!--Describe what testing was performed and which tests were added.-->
#### Testing

unit test

<!--Describe the documentation added.-->
#### Documentation

in README.md

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
joker-star-l authored Feb 25, 2025
1 parent dae31e1 commit 8d2052d
Show file tree
Hide file tree
Showing 23 changed files with 426 additions and 212 deletions.
27 changes: 27 additions & 0 deletions .chloggen/doris-add-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "add new config: log_response, label_prefix, headers, log_progress_interval"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38162]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/dorisexporter/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e2e*
6 changes: 5 additions & 1 deletion exporter/dorisexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ The following configuration options are supported:
* `history_days` (default = 0) Data older than these days will be deleted; ignored if `create_schema` is false. If set to 0, historical data will not be deleted.
* `create_history_days` (default = 0) The number of days in the history partition that was created when the table was created; ignored if `create_schema` is false. If `history_days` is not 0, `create_history_days` needs to be less than or equal to `history_days`.
* `replication_num` (default = 1) The number of replicas of the table; ignored if `create_schema` is false.
* `timezone` (default is UTC) The time zone of doris.
* `timezone` (default is the time zone of the opentelemetry collector if IANA Time Zone Database is found, else is UTC) The time zone of doris, e.g. Asia/Shanghai.
* `log_response` (default = false) Whether to log the response of doris stream load.
* `label_prefix` (default = open_telemetry) the prefix of the label in doris stream load. The final generated label is {label_prefix}{db}{table}{yyyyMMddHHmmss}{uuid}.
* `headers` (default is empty map) The headers of doris stream load. Details: [header parameters](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual#load-configuration-parameters) and [group commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual#stream-load).
* `log_progress_interval` (default = 10) The interval, in seconds, between statistical logs. When it is less than or equal to 0, the statistical log is not printed.
* `sending_queue` [details here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration)
* `enabled` (default = true)
* `num_consumers` (default = 10) Number of consumers that dequeue batches; ignored if `enabled` is false.
Expand Down
18 changes: 12 additions & 6 deletions exporter/dorisexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

type Config struct {
// confighttp.ClientConfig.Headers is the headers of doris stream load.
confighttp.ClientConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
Expand All @@ -42,6 +43,15 @@ type Config struct {
ReplicationNum int32 `mapstructure:"replication_num"`
// Timezone is the timezone of the doris.
TimeZone string `mapstructure:"timezone"`
// LogResponse is whether to log the response of doris stream load.
LogResponse bool `mapstructure:"log_response"`
// LabelPrefix is the prefix of the label in doris stream load.
LabelPrefix string `mapstructure:"label_prefix"`
// ProgressInterval is the interval of the progress reporter.
LogProgressInterval int `mapstructure:"log_progress_interval"`

// not in config file, will be set in Validate
timeLocation *time.Location `mapstructure:"-"`
}

type Table struct {
Expand Down Expand Up @@ -94,7 +104,8 @@ func (cfg *Config) Validate() (err error) {
err = errors.Join(err, errors.New("metrics table name must be alphanumeric and underscore"))
}

_, errT := cfg.timeZone()
var errT error
cfg.timeLocation, errT = time.LoadLocation(cfg.TimeZone)
if errT != nil {
err = errors.Join(err, errors.New("invalid timezone"))
}
Expand All @@ -113,15 +124,10 @@ func (cfg *Config) startHistoryDays() int32 {
return -cfg.HistoryDays
}

func (cfg *Config) timeZone() (*time.Location, error) {
return time.LoadLocation(cfg.TimeZone)
}

const (
properties = `
PROPERTIES (
"replication_num" = "%d",
"enable_single_replica_compaction" = "true",
"compaction_policy" = "time_series",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
Expand Down
76 changes: 45 additions & 31 deletions exporter/dorisexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,53 @@ func TestLoadConfig(t *testing.T) {
defaultCfg := createDefaultConfig()
defaultCfg.(*Config).Endpoint = "http://localhost:8030"
defaultCfg.(*Config).MySQLEndpoint = "localhost:9030"
err = defaultCfg.(*Config).Validate()
require.NoError(t, err)

httpClientConfig := confighttp.NewDefaultClientConfig()
httpClientConfig.Timeout = 5 * time.Second
httpClientConfig.Endpoint = "http://localhost:8030"
httpClientConfig.Headers = map[string]configopaque.String{
"max_filter_ratio": "0.1",
"strict_mode": "true",
"group_commit": "async_mode",
}

fullCfg := &Config{
ClientConfig: httpClientConfig,
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 300 * time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
QueueSettings: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
},
Table: Table{
Logs: "otel_logs",
Traces: "otel_traces",
Metrics: "otel_metrics",
},
Database: "otel",
Username: "admin",
Password: configopaque.String("admin"),
CreateSchema: true,
MySQLEndpoint: "localhost:9030",
HistoryDays: 0,
CreateHistoryDays: 0,
ReplicationNum: 2,
TimeZone: "Asia/Shanghai",
LogResponse: true,
LabelPrefix: "otel",
LogProgressInterval: 5,
}
err = fullCfg.Validate()
require.NoError(t, err)

tests := []struct {
id component.ID
Expand All @@ -45,37 +88,8 @@ func TestLoadConfig(t *testing.T) {
expected: defaultCfg,
},
{
id: component.NewIDWithName(metadata.Type, "full"),
expected: &Config{
ClientConfig: httpClientConfig,
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 300 * time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
QueueSettings: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
},
Table: Table{
Logs: "otel_logs",
Traces: "otel_traces",
Metrics: "otel_metrics",
},
Database: "otel",
Username: "admin",
Password: configopaque.String("admin"),
CreateSchema: true,
MySQLEndpoint: "localhost:9030",
HistoryDays: 0,
CreateHistoryDays: 0,
ReplicationNum: 2,
TimeZone: "Asia/Shanghai",
},
id: component.NewIDWithName(metadata.Type, "full"),
expected: fullCfg,
},
}

Expand Down
33 changes: 26 additions & 7 deletions exporter/dorisexporter/exporter_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

_ "github.com/go-sql-driver/mysql" // for register database driver
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)
Expand All @@ -27,17 +28,16 @@ type commonExporter struct {
logger *zap.Logger
cfg *Config
timeZone *time.Location
reporter *progressReporter
}

func newExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *commonExporter {
// There won't be an error because it's already been validated in the Config.Validate method.
timeZone, _ := cfg.timeZone()

func newExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings, reporterName string) *commonExporter {
return &commonExporter{
TelemetrySettings: set,
logger: logger,
cfg: cfg,
timeZone: timeZone,
timeZone: cfg.timeLocation,
reporter: newProgressReporter(reporterName, cfg.LogProgressInterval, logger),
}
}

Expand Down Expand Up @@ -66,14 +66,29 @@ type streamLoadResponse struct {
}

func (r *streamLoadResponse) success() bool {
return r.Status == "Success" || r.Status == "Publish Timeout"
return r.Status == "Success" || r.Status == "Publish Timeout" || r.Status == "Label Already Exists"
}

func (r *streamLoadResponse) duplication() bool {
return r.Status == "Label Already Exists"
}

func streamLoadURL(address string, db string, table string) string {
return address + "/api/" + db + "/" + table + "/_stream_load"
}

func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []byte) (*http.Request, error) {
func generateLabel(cfg *Config, table string) string {
return fmt.Sprintf(
"%s_%s_%s_%s_%s",
cfg.LabelPrefix,
cfg.Database,
table,
time.Now().In(cfg.timeLocation).Format("20060102150405"),
uuid.New().String(),
)
}

func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []byte, label string) (*http.Request, error) {
url := streamLoadURL(cfg.Endpoint, cfg.Database, table)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(data))
if err != nil {
Expand All @@ -83,6 +98,10 @@ func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []by
req.Header.Set("format", "json")
req.Header.Set("Expect", "100-continue")
req.Header.Set("read_json_by_line", "true")
groupCommit := string(cfg.Headers["group_commit"])
if groupCommit == "" || groupCommit == "off_mode" {
req.Header.Set("label", label)
}
if cfg.ClientConfig.Timeout != 0 {
req.Header.Set("timeout", fmt.Sprintf("%d", cfg.ClientConfig.Timeout/time.Second))
}
Expand Down
11 changes: 8 additions & 3 deletions exporter/dorisexporter/exporter_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@ import (

func TestNewCommonExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings())
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings(), "")
require.NotNil(t, exporter)
}

func TestCommonExporter_FormatTime(t *testing.T) {
cfg := createDefaultConfig().(*Config)
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings())
cfg.Endpoint = "http://localhost:8030"
cfg.CreateSchema = false
err := cfg.Validate()
require.NoError(t, err)

exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings(), "")
require.NotNil(t, exporter)

now := time.Date(2024, 1, 1, 0, 0, 0, 1000, time.Local)
Expand Down Expand Up @@ -62,7 +67,7 @@ func findRandomPort() (int, error) {
return port, nil
}

func TestToJsonLines(t *testing.T) {
func TestToJSONLines(t *testing.T) {
logs, err := toJSONLines([]*dLog{
{}, {},
})
Expand Down
58 changes: 37 additions & 21 deletions exporter/dorisexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type logsExporter struct {

func newLogsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *logsExporter {
return &logsExporter{
commonExporter: newExporter(logger, cfg, set),
commonExporter: newExporter(logger, cfg, set, "LOG"),
}
}

Expand All @@ -53,24 +53,27 @@ func (e *logsExporter) start(ctx context.Context, host component.Host) error {
}
e.client = client

if !e.cfg.CreateSchema {
return nil
}
if e.cfg.CreateSchema {
conn, err := createDorisMySQLClient(e.cfg)
if err != nil {
return err
}
defer conn.Close()

conn, err := createDorisMySQLClient(e.cfg)
if err != nil {
return err
}
defer conn.Close()
err = createAndUseDatabase(ctx, conn, e.cfg)
if err != nil {
return err
}

err = createAndUseDatabase(ctx, conn, e.cfg)
if err != nil {
return err
ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr())
_, err = conn.ExecContext(ctx, ddl)
if err != nil {
return err
}
}

ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr())
_, err = conn.ExecContext(ctx, ddl)
return err
go e.reporter.report()
return nil
}

func (e *logsExporter) shutdown(_ context.Context) error {
Expand All @@ -81,6 +84,7 @@ func (e *logsExporter) shutdown(_ context.Context) error {
}

func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
label := generateLabel(e.cfg, e.cfg.Table.Logs)
logs := make([]*dLog, 0, ld.LogRecordCount())

for i := 0; i < ld.ResourceLogs().Len(); i++ {
Expand Down Expand Up @@ -118,16 +122,16 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
}
}

return e.pushLogDataInternal(ctx, logs)
return e.pushLogDataInternal(ctx, logs, label)
}

func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error {
func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog, label string) error {
marshal, err := toJSONLines(logs)
if err != nil {
return err
}

req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal)
req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal, label)
if err != nil {
return err
}
Expand All @@ -149,9 +153,21 @@ func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) er
return err
}

if !response.success() {
return fmt.Errorf("failed to push log data: %s", response.Message)
if response.success() {
e.reporter.incrTotalRows(int64(len(logs)))
e.reporter.incrTotalBytes(int64(len(marshal)))

if response.duplication() {
e.logger.Warn("label already exists", zap.String("label", label), zap.Int("skipped", len(logs)))
}

if e.cfg.LogResponse {
e.logger.Info("log response:\n" + string(body))
} else {
e.logger.Debug("log response:\n" + string(body))
}
return nil
}

return nil
return fmt.Errorf("failed to push log data, response:%s", string(body))
}
Loading

0 comments on commit 8d2052d

Please sign in to comment.